| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.cache; |
| |
| import static org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier.getInstance; |
| import static org.apache.geode.logging.internal.spi.LogWriterLevel.ALL; |
| import static org.apache.geode.test.dunit.Assert.assertEquals; |
| import static org.apache.geode.test.dunit.Assert.assertFalse; |
| import static org.apache.geode.test.dunit.Assert.assertNotNull; |
| import static org.apache.geode.test.dunit.Assert.assertNull; |
| import static org.apache.geode.test.dunit.Assert.assertTrue; |
| import static org.apache.geode.test.dunit.Assert.fail; |
| import static org.junit.runners.MethodSorters.NAME_ASCENDING; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| |
| import org.junit.FixMethodOrder; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import org.apache.geode.CancelException; |
| import org.apache.geode.DataSerializable; |
| import org.apache.geode.LogWriter; |
| import org.apache.geode.cache.client.NoAvailableServersException; |
| import org.apache.geode.cache.client.Pool; |
| import org.apache.geode.cache.client.PoolManager; |
| import org.apache.geode.cache.client.internal.Endpoint; |
| import org.apache.geode.cache.client.internal.PoolImpl; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.cache.util.CacheListenerAdapter; |
| import org.apache.geode.cache30.CacheSerializableRunnable; |
| import org.apache.geode.cache30.CertifiableTestCacheListener; |
| import org.apache.geode.cache30.ClientServerTestCase; |
| import org.apache.geode.cache30.TestCacheLoader; |
| import org.apache.geode.cache30.TestCacheWriter; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.distributed.internal.ClusterDistributionManager; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.cache.CacheServerImpl; |
| import org.apache.geode.internal.cache.EntryExpiryTask; |
| import org.apache.geode.internal.cache.LocalRegion; |
| import org.apache.geode.internal.cache.PoolStats; |
| import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier; |
| import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifierStats; |
| import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; |
| import org.apache.geode.internal.logging.LocalLogWriter; |
| import org.apache.geode.test.awaitility.GeodeAwaitility; |
| import org.apache.geode.test.dunit.AsyncInvocation; |
| import org.apache.geode.test.dunit.Host; |
| import org.apache.geode.test.dunit.Invoke; |
| import org.apache.geode.test.dunit.NetworkUtils; |
| import org.apache.geode.test.dunit.SerializableRunnable; |
| import org.apache.geode.test.dunit.ThreadUtils; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.Wait; |
| import org.apache.geode.test.dunit.WaitCriterion; |
| import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; |
| import org.apache.geode.test.junit.categories.ClientServerTest; |
| |
| /** |
| * This class tests the client connection pool in GemFire. It does so by creating a cache server |
| * with a cache and a pre-defined region and a data loader. The client creates the same region with |
| * a pool (this happens in the controller VM). the client then spins up 10 different threads and |
| * issues gets on keys. The server data loader returns the data to the client. |
| * |
| * Test uses Groboutils TestRunnable objects to achieve multi threading behavior in the test. |
| */ |
| @Category({ClientServerTest.class}) |
| @FixMethodOrder(NAME_ASCENDING) |
| public class ConnectionPoolDUnitTest extends JUnit4CacheTestCase { |
| |
| /** The port on which the cache server was started in this VM */ |
| private static int bridgeServerPort; |
| |
| protected static int port = 0; |
| protected static int port2 = 0; |
| |
| protected static int numberOfAfterInvalidates; |
| protected static int numberOfAfterCreates; |
| protected static int numberOfAfterUpdates; |
| |
| protected static final int TYPE_CREATE = 0; |
| protected static final int TYPE_UPDATE = 1; |
| protected static final int TYPE_INVALIDATE = 2; |
| protected static final int TYPE_DESTROY = 3; |
| |
| @Override |
| public final void postSetUp() throws Exception { |
| // avoid IllegalStateException from HandShake by connecting all vms to |
| // system before creating pool |
| getSystem(); |
| Invoke.invokeInEveryVM(new SerializableRunnable("getSystem") { |
| @Override |
| public void run() { |
| getSystem(); |
| } |
| }); |
| postSetUpConnectionPoolDUnitTest(); |
| } |
| |
| protected void postSetUpConnectionPoolDUnitTest() throws Exception {} |
| |
| @Override |
| public final void postTearDownCacheTestCase() throws Exception { |
| Invoke.invokeInEveryVM(new SerializableRunnable() { |
| @Override |
| public void run() { |
| Map pools = PoolManager.getAll(); |
| if (!pools.isEmpty()) { |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() |
| .warning("found pools remaining after teardown: " + pools); |
| assertEquals(0, pools.size()); |
| } |
| } |
| }); |
| postTearDownConnectionPoolDUnitTest(); |
| |
| } |
| |
| protected void postTearDownConnectionPoolDUnitTest() throws Exception {} |
| |
| protected/* GemStoneAddition */ static PoolImpl getPool(Region r) { |
| PoolImpl result = null; |
| String poolName = r.getAttributes().getPoolName(); |
| if (poolName != null) { |
| result = (PoolImpl) PoolManager.find(poolName); |
| } |
| return result; |
| } |
| |
| protected static TestCacheWriter getTestWriter(Region r) { |
| return (TestCacheWriter) r.getAttributes().getCacheWriter(); |
| } |
| |
| /** |
| * Create a cache server on the given port without starting it. |
| * |
| * @since GemFire 5.0.2 |
| */ |
| protected void createBridgeServer(int port) throws IOException { |
| CacheServer bridge = getCache().addCacheServer(); |
| bridge.setPort(port); |
| bridge.setMaxThreads(getMaxThreads()); |
| bridgeServerPort = bridge.getPort(); |
| } |
| |
| /** |
| * Starts a cache server on the given port, using the given deserializeValues and |
| * notifyBySubscription to serve up the given region. |
| * |
| * @since GemFire 4.0 |
| */ |
| protected void startBridgeServer(int port) throws IOException { |
| startBridgeServer(port, -1); |
| } |
| |
| protected void startBridgeServer(int port, int socketBufferSize) throws IOException { |
| startBridgeServer(port, socketBufferSize, CacheServer.DEFAULT_LOAD_POLL_INTERVAL); |
| } |
| |
| protected void startBridgeServer(int port, int socketBufferSize, long loadPollInterval) |
| throws IOException { |
| |
| Cache cache = getCache(); |
| CacheServer bridge = cache.addCacheServer(); |
| bridge.setPort(port); |
| if (socketBufferSize != -1) { |
| bridge.setSocketBufferSize(socketBufferSize); |
| } |
| bridge.setMaxThreads(getMaxThreads()); |
| bridge.setLoadPollInterval(loadPollInterval); |
| bridge.start(); |
| bridgeServerPort = bridge.getPort(); |
| } |
| |
| /** |
| * By default return 0 which turns off selector and gives thread per cnx. Test subclasses can |
| * override to run with selector. |
| * |
| * @since GemFire 5.1 |
| */ |
| protected int getMaxThreads() { |
| return 0; |
| } |
| |
| /** |
| * Stops the cache server that serves up the given cache. |
| * |
| * @since GemFire 4.0 |
| */ |
| void stopBridgeServer(Cache cache) { |
| CacheServer bridge = cache.getCacheServers().iterator().next(); |
| bridge.stop(); |
| assertFalse(bridge.isRunning()); |
| } |
| |
| void stopBridgeServers(Cache cache) { |
| CacheServer bridge = null; |
| for (Iterator bsI = cache.getCacheServers().iterator(); bsI.hasNext();) { |
| bridge = (CacheServer) bsI.next(); |
| bridge.stop(); |
| assertFalse(bridge.isRunning()); |
| } |
| } |
| |
| private void restartBridgeServers(Cache cache) throws IOException { |
| CacheServer bridge = null; |
| for (Iterator bsI = cache.getCacheServers().iterator(); bsI.hasNext();) { |
| bridge = (CacheServer) bsI.next(); |
| bridge.start(); |
| assertTrue(bridge.isRunning()); |
| } |
| } |
| |
| protected InternalDistributedSystem createLonerDS() { |
| disconnectFromDS(); |
| InternalDistributedSystem ds = getLonerSystem(); |
| assertEquals(0, ds.getDistributionManager().getOtherDistributionManagerIds().size()); |
| return ds; |
| } |
| |
| /** |
| * Returns region attributes for a <code>LOCAL</code> region |
| */ |
| protected RegionAttributes getRegionAttributes() { |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); // test validation expects this behavior |
| return factory.create(); |
| } |
| |
| private static String createBridgeClientConnection(String host, int[] ports) { |
| StringBuffer sb = new StringBuffer(); |
| for (int i = 0; i < ports.length; i++) { |
| if (i > 0) |
| sb.append(","); |
| sb.append("name" + i + "="); |
| sb.append(host + ":" + ports[i]); |
| } |
| return sb.toString(); |
| } |
| |
| private class EventWrapper { |
| public final EntryEvent event; |
| public final Object key; |
| public final Object val; |
| public final Object arg; |
| public final int type; |
| |
| public EventWrapper(EntryEvent ee, int type) { |
| this.event = ee; |
| this.key = ee.getKey(); |
| this.val = ee.getNewValue(); |
| this.arg = ee.getCallbackArgument(); |
| this.type = type; |
| } |
| |
| public String toString() { |
| return "EventWrapper: event=" + event + ", type=" + type; |
| } |
| } |
| |
| protected class ControlListener extends CacheListenerAdapter { |
| public final LinkedList events = new LinkedList(); |
| public final Object CONTROL_LOCK = new Object(); |
| |
| public boolean waitWhileNotEnoughEvents(long sleepMs, int eventCount) { |
| long maxMillis = System.currentTimeMillis() + sleepMs; |
| synchronized (this.CONTROL_LOCK) { |
| try { |
| while (this.events.size() < eventCount) { |
| long waitMillis = maxMillis - System.currentTimeMillis(); |
| if (waitMillis < 10) { |
| break; |
| } |
| this.CONTROL_LOCK.wait(waitMillis); |
| } |
| } catch (InterruptedException abort) { |
| fail("interrupted"); |
| } |
| return !this.events.isEmpty(); |
| } |
| } |
| |
| @Override |
| public void afterCreate(EntryEvent e) { |
| // System.out.println("afterCreate: " + e); |
| synchronized (this.CONTROL_LOCK) { |
| this.events.add(new EventWrapper(e, TYPE_CREATE)); |
| this.CONTROL_LOCK.notifyAll(); |
| } |
| } |
| |
| @Override |
| public void afterUpdate(EntryEvent e) { |
| // System.out.println("afterUpdate: " + e); |
| synchronized (this.CONTROL_LOCK) { |
| this.events.add(new EventWrapper(e, TYPE_UPDATE)); |
| this.CONTROL_LOCK.notifyAll(); |
| } |
| } |
| |
| @Override |
| public void afterInvalidate(EntryEvent e) { |
| // System.out.println("afterInvalidate: " + e); |
| synchronized (this.CONTROL_LOCK) { |
| this.events.add(new EventWrapper(e, TYPE_INVALIDATE)); |
| this.CONTROL_LOCK.notifyAll(); |
| } |
| } |
| |
| @Override |
| public void afterDestroy(EntryEvent e) { |
| // System.out.println("afterDestroy: " + e); |
| synchronized (this.CONTROL_LOCK) { |
| this.events.add(new EventWrapper(e, TYPE_DESTROY)); |
| this.CONTROL_LOCK.notifyAll(); |
| } |
| } |
| } |
| |
| /** |
| * Create a fake EntryEvent that returns the provided region for {@link CacheEvent#getRegion()} |
| * and returns {@link org.apache.geode.cache.Operation#LOCAL_LOAD_CREATE} for |
| * {@link CacheEvent#getOperation()} |
| * |
| * @return fake entry event |
| */ |
| protected static EntryEvent createFakeyEntryEvent(final Region r) { |
| return new EntryEvent() { |
| @Override |
| public Operation getOperation() { |
| return Operation.LOCAL_LOAD_CREATE; // fake out pool to exit early |
| } |
| |
| @Override |
| public Region getRegion() { |
| return r; |
| } |
| |
| @Override |
| public Object getKey() { |
| return null; |
| } |
| |
| @Override |
| public Object getOldValue() { |
| return null; |
| } |
| |
| @Override |
| public boolean isOldValueAvailable() { |
| return true; |
| } |
| |
| @Override |
| public Object getNewValue() { |
| return null; |
| } |
| |
| public boolean isLocalLoad() { |
| return false; |
| } |
| |
| public boolean isNetLoad() { |
| return false; |
| } |
| |
| public boolean isLoad() { |
| return true; |
| } |
| |
| public boolean isNetSearch() { |
| return false; |
| } |
| |
| @Override |
| public TransactionId getTransactionId() { |
| return null; |
| } |
| |
| @Override |
| public Object getCallbackArgument() { |
| return null; |
| } |
| |
| @Override |
| public boolean isCallbackArgumentAvailable() { |
| return true; |
| } |
| |
| @Override |
| public boolean isOriginRemote() { |
| return false; |
| } |
| |
| @Override |
| public DistributedMember getDistributedMember() { |
| return null; |
| } |
| |
| public boolean isExpiration() { |
| return false; |
| } |
| |
| public boolean isDistributed() { |
| return false; |
| } |
| |
| public boolean isBridgeEvent() { |
| return hasClientOrigin(); |
| } |
| |
| @Override |
| public boolean hasClientOrigin() { |
| return false; |
| } |
| |
| public ClientProxyMembershipID getContext() { |
| return null; |
| } |
| |
| @Override |
| public SerializedCacheValue getSerializedOldValue() { |
| return null; |
| } |
| |
| @Override |
| public SerializedCacheValue getSerializedNewValue() { |
| return null; |
| } |
| }; |
| } |
| |
| public void verifyBalanced(final PoolImpl pool, int expectedServer, |
| final int expectedConsPerServer) { |
| verifyServerCount(pool, expectedServer); |
| WaitCriterion ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return balanced(pool, expectedConsPerServer); |
| } |
| |
| @Override |
| public String description() { |
| return "expected " + expectedConsPerServer + " but endpoints=" + outOfBalanceReport(pool); |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(ev); |
| assertEquals("expected " + expectedConsPerServer + " but endpoints=" + outOfBalanceReport(pool), |
| true, balanced(pool, expectedConsPerServer)); |
| } |
| |
| protected boolean balanced(PoolImpl pool, int expectedConsPerServer) { |
| Iterator it = pool.getEndpointMap().values().iterator(); |
| while (it.hasNext()) { |
| Endpoint ep = (Endpoint) it.next(); |
| if (ep.getStats().getConnections() != expectedConsPerServer) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| protected String outOfBalanceReport(PoolImpl pool) { |
| StringBuffer result = new StringBuffer(); |
| Iterator it = pool.getEndpointMap().values().iterator(); |
| result.append("<"); |
| while (it.hasNext()) { |
| Endpoint ep = (Endpoint) it.next(); |
| result.append("ep=" + ep); |
| result.append(" conCount=" + ep.getStats().getConnections()); |
| if (it.hasNext()) { |
| result.append(", "); |
| } |
| } |
| result.append(">"); |
| return result.toString(); |
| } |
| |
| public void waitForDenylistToClear(final PoolImpl pool) { |
| WaitCriterion ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return pool.getDenylistedServers().size() == 0; |
| } |
| |
| @Override |
| public String description() { |
| return null; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(ev); |
| assertEquals("unexpected denylistedServers=" + pool.getDenylistedServers(), 0, |
| pool.getDenylistedServers().size()); |
| } |
| |
| public void verifyServerCount(final PoolImpl pool, final int expectedCount) { |
| getCache().getLogger().info("verifyServerCount expects=" + expectedCount); |
| WaitCriterion ev = new WaitCriterion() { |
| String excuse; |
| |
| @Override |
| public boolean done() { |
| int actual = pool.getConnectedServerCount(); |
| if (actual == expectedCount) { |
| return true; |
| } |
| excuse = "Found only " + actual + " servers, expected " + expectedCount; |
| return false; |
| } |
| |
| @Override |
| public String description() { |
| return excuse; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(ev); |
| } |
| |
| /** |
| * Tests that the callback argument is sent to the server |
| */ |
| @Test |
| public void test001CallbackArg() throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| |
| final Object createCallbackArg = "CREATE CALLBACK ARG"; |
| final Object updateCallbackArg = "PUT CALLBACK ARG"; |
| |
| vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| |
| CacheWriter cw = new TestCacheWriter() { |
| @Override |
| public final void beforeUpdate2(EntryEvent event) throws CacheWriterException { |
| Object beca = event.getCallbackArgument(); |
| assertEquals(updateCallbackArg, beca); |
| } |
| |
| @Override |
| public void beforeCreate2(EntryEvent event) throws CacheWriterException { |
| Object beca = event.getCallbackArgument(); |
| assertEquals(createCallbackArg, beca); |
| } |
| }; |
| AttributesFactory factory = getBridgeServerRegionAttributes(null, cw); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }); |
| final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| |
| SerializableRunnable create = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| |
| ClientServerTestCase.configureConnectionPool(factory, NetworkUtils.getServerHostName(host), |
| port, -1, true, -1, -1, null); |
| createRegion(name, factory.create()); |
| } |
| }; |
| |
| vm1.invoke(create); |
| vm1.invoke(new CacheSerializableRunnable("Add entries") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| region.create(new Integer(i), "old" + i, createCallbackArg); |
| } |
| for (int i = 0; i < 10; i++) { |
| region.put(new Integer(i), "new" + i, updateCallbackArg); |
| } |
| } |
| }); |
| |
| vm0.invoke(new CacheSerializableRunnable("Check cache writer") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| TestCacheWriter writer = getTestWriter(region); |
| assertTrue(writer.wasInvoked()); |
| } |
| }); |
| |
| SerializableRunnable close = new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| region.localDestroyRegion(); |
| } |
| }; |
| |
| vm1.invoke(close); |
| |
| vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| |
| } |
| |
| /** |
| * Tests that consecutive puts have the callback assigned appropriately. |
| */ |
| @Test |
| public void test002CallbackArg2() throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| |
| final Object createCallbackArg = "CREATE CALLBACK ARG"; |
| // final Object updateCallbackArg = "PUT CALLBACK ARG"; |
| |
| vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| CacheWriter cw = new TestCacheWriter() { |
| @Override |
| public void beforeCreate2(EntryEvent event) throws CacheWriterException { |
| Integer key = (Integer) event.getKey(); |
| if (key.intValue() % 2 == 0) { |
| Object beca = event.getCallbackArgument(); |
| assertEquals(createCallbackArg, beca); |
| } else { |
| Object beca = event.getCallbackArgument(); |
| assertNull(beca); |
| } |
| } |
| }; |
| AttributesFactory factory = getBridgeServerRegionAttributes(null, cw); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }); |
| final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| SerializableRunnable create = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null); |
| createRegion(name, factory.create()); |
| } |
| }; |
| |
| vm1.invoke(create); |
| vm1.invoke(new CacheSerializableRunnable("Add entries") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| if (i % 2 == 0) { |
| region.create(new Integer(i), "old" + i, createCallbackArg); |
| |
| } else { |
| region.create(new Integer(i), "old" + i); |
| } |
| } |
| } |
| }); |
| |
| SerializableRunnable close = new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| region.localDestroyRegion(); |
| } |
| }; |
| |
| vm1.invoke(close); |
| |
| vm0.invoke(new CacheSerializableRunnable("Check cache writer") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| TestCacheWriter writer = getTestWriter(region); |
| assertTrue(writer.wasInvoked()); |
| } |
| }); |
| |
| vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| } |
| |
| /** |
| * Tests for bug 36684 by having two cache servers with cacheloaders that should always return a |
| * value and one client connected to each server reading values. If the bug exists, the clients |
| * will get null sometimes. |
| * |
| */ |
| @Test |
| public void test003Bug36684() throws CacheException, InterruptedException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| VM vm3 = host.getVM(3); |
| |
| // Create the cache servers with distributed, mirrored region |
| SerializableRunnable createServer = new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| CacheLoader cl = new CacheLoader() { |
| @Override |
| public Object load(LoaderHelper helper) { |
| return helper.getKey(); |
| } |
| |
| @Override |
| public void close() { |
| |
| } |
| }; |
| AttributesFactory factory = getBridgeServerMirroredAckRegionAttributes(cl, null); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }; |
| getSystem().getLogWriter().info("before create server"); |
| vm0.invoke(createServer); |
| vm1.invoke(createServer); |
| |
| // Create cache server clients |
| final int numberOfKeys = 1000; |
| final String host0 = NetworkUtils.getServerHostName(host); |
| final int vm0Port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final int vm1Port = vm1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| SerializableRunnable createClient = |
| new CacheSerializableRunnable("Create Cache Server Client") { |
| @Override |
| public void run2() throws CacheException { |
| // reset all static listener variables in case this is being rerun in a subclass |
| numberOfAfterInvalidates = 0; |
| numberOfAfterCreates = 0; |
| numberOfAfterUpdates = 0; |
| // create the region |
| getLonerSystem(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); // test validation expects this behavior |
| // create bridge writer |
| ClientServerTestCase.configureConnectionPool(factory, host0, vm0Port, vm1Port, true, -1, |
| -1, null); |
| createRegion(name, factory.create()); |
| } |
| }; |
| getSystem().getLogWriter().info("before create client"); |
| vm2.invoke(createClient); |
| vm3.invoke(createClient); |
| |
| // Initialize each client with entries (so that afterInvalidate is called) |
| SerializableRunnable initializeClient = new CacheSerializableRunnable("Initialize Client") { |
| @Override |
| public void run2() throws CacheException { |
| // StringBuffer errors = new StringBuffer(); |
| numberOfAfterInvalidates = 0; |
| numberOfAfterCreates = 0; |
| numberOfAfterUpdates = 0; |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| for (int i = 0; i < numberOfKeys; i++) { |
| String expected = "key-" + i; |
| String actual = (String) region.get("key-" + i); |
| assertEquals(expected, actual); |
| } |
| } |
| }; |
| |
| getSystem().getLogWriter().info("before initialize client"); |
| AsyncInvocation inv2 = vm2.invokeAsync(initializeClient); |
| AsyncInvocation inv3 = vm3.invokeAsync(initializeClient); |
| |
| ThreadUtils.join(inv2, 30 * 1000); |
| ThreadUtils.join(inv3, 30 * 1000); |
| |
| if (inv2.exceptionOccurred()) { |
| org.apache.geode.test.dunit.Assert.fail("Error occurred in vm2", inv2.getException()); |
| } |
| if (inv3.exceptionOccurred()) { |
| org.apache.geode.test.dunit.Assert.fail("Error occurred in vm3", inv3.getException()); |
| } |
| } |
| |
| /** |
| * Test for client connection loss with CacheLoader Exception on the server. |
| */ |
| @Test |
| public void test004ForCacheLoaderException() throws CacheException, InterruptedException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM server = host.getVM(0); |
| VM client = host.getVM(1); |
| |
| // Create the cache servers with distributed, mirrored region |
| SerializableRunnable createServer = new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| CacheLoader cl = new CacheLoader() { |
| @Override |
| public Object load(LoaderHelper helper) { |
| System.out.println("### CALLING CACHE LOADER...."); |
| throw new CacheLoaderException( |
| "Test for CahceLoaderException causing Client connection to disconnect."); |
| } |
| |
| @Override |
| public void close() {} |
| }; |
| AttributesFactory factory = getBridgeServerMirroredAckRegionAttributes(cl, null); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }; |
| getSystem().getLogWriter().info("before create server"); |
| |
| server.invoke(createServer); |
| |
| // Create cache server clients |
| final int numberOfKeys = 10; |
| final String host0 = NetworkUtils.getServerHostName(host); |
| final int[] port = |
| new int[] {server.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort())}; |
| final String poolName = "myPool"; |
| |
| SerializableRunnable createClient = |
| new CacheSerializableRunnable("Create Cache Server Client") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| // create bridge writer |
| ClientServerTestCase.configureConnectionPoolWithName(factory, host0, port, true, -1, -1, |
| null, poolName); |
| createRegion(name, factory.create()); |
| } |
| }; |
| getSystem().getLogWriter().info("before create client"); |
| client.invoke(createClient); |
| |
| // Initialize each client with entries (so that afterInvalidate is called) |
| SerializableRunnable invokeServerCacheLaoder = |
| new CacheSerializableRunnable("Initialize Client") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| PoolStats stats = ((PoolImpl) PoolManager.find(poolName)).getStats(); |
| int oldConnects = stats.getConnects(); |
| int oldDisConnects = stats.getDisConnects(); |
| try { |
| for (int i = 0; i < numberOfKeys; i++) { |
| String actual = (String) region.get("key-" + i); |
| } |
| } catch (Exception ex) { |
| if (!(ex.getCause() instanceof CacheLoaderException)) { |
| fail( |
| "UnExpected Exception, expected to receive CacheLoaderException from server, instead found: " |
| + ex.getCause().getClass()); |
| } |
| } |
| int newConnects = stats.getConnects(); |
| int newDisConnects = stats.getDisConnects(); |
| // System.out.println("#### new connects/disconnects :" + newConnects + ":" + |
| // newDisConnects); |
| if (newConnects != oldConnects && newDisConnects != oldDisConnects) { |
| fail("New connection has created for Server side CacheLoaderException."); |
| } |
| } |
| }; |
| |
| getSystem().getLogWriter().info("before initialize client"); |
| AsyncInvocation inv2 = client.invokeAsync(invokeServerCacheLaoder); |
| |
| ThreadUtils.join(inv2, 30 * 1000); |
| SerializableRunnable stopServer = new SerializableRunnable("stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }; |
| server.invoke(stopServer); |
| |
| } |
| |
| protected void validateDS() { |
| List l = InternalDistributedSystem.getExistingSystems(); |
| if (l.size() > 1) { |
| getSystem().getLogWriter().info("validateDS: size=" + l.size() + " isDedicatedAdminVM=" |
| + ClusterDistributionManager.isDedicatedAdminVM() + " l=" + l); |
| } |
| assertFalse(ClusterDistributionManager.isDedicatedAdminVM()); |
| assertEquals(1, l.size()); |
| } |
| |
| /** |
| * Tests the basic operations of the {@link Pool} |
| * |
| * @since GemFire 3.5 |
| */ |
| @Test |
| public void test006Pool() throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = Host.getHost(0).getVM(2); |
| |
| vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setConcurrencyChecksEnabled(false); |
| factory.setCacheLoader(new CacheLoader() { |
| @Override |
| public Object load(LoaderHelper helper) { |
| // System.err.println("CacheServer data loader called"); |
| return helper.getKey().toString(); |
| } |
| |
| @Override |
| public void close() { |
| |
| } |
| }); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }); |
| final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| SerializableRunnable create = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| validateDS(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null); |
| createRegion(name, factory.create()); |
| } |
| }; |
| vm1.invoke(create); |
| |
| vm1.invoke(new CacheSerializableRunnable("Get values") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| Object value = region.get(new Integer(i)); |
| assertEquals(String.valueOf(i), value); |
| } |
| } |
| }); |
| |
| vm1.invoke(new CacheSerializableRunnable("Update values") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| |
| for (int i = 0; i < 10; i++) { |
| region.put(new Integer(i), new Integer(i)); |
| } |
| } |
| }); |
| |
| vm2.invoke(create); |
| vm2.invoke(new CacheSerializableRunnable("Validate values") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| Object value = region.get(new Integer(i)); |
| assertNotNull(value); |
| assertTrue(value instanceof Integer); |
| assertEquals(i, ((Integer) value).intValue()); |
| } |
| } |
| }); |
| |
| vm1.invoke(new CacheSerializableRunnable("Close Pool") { |
| // do some special close validation here |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| String pName = region.getAttributes().getPoolName(); |
| PoolImpl p = (PoolImpl) PoolManager.find(pName); |
| assertEquals(false, p.isDestroyed()); |
| assertEquals(1, p.getAttachCount()); |
| try { |
| p.destroy(); |
| fail("expected IllegalStateException"); |
| } catch (IllegalStateException expected) { |
| } |
| region.localDestroyRegion(); |
| assertEquals(false, p.isDestroyed()); |
| assertEquals(0, p.getAttachCount()); |
| } |
| }); |
| |
| vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| } |
| |
| /** |
| * Tests the BridgeServer failover (bug 31832). |
| */ |
| @Test |
| public void test007BridgeServerFailoverCnx1() throws CacheException { |
| disconnectAllFromDS(); |
| basicTestBridgeServerFailover(1); |
| } |
| |
| /** |
| * Test BridgeServer failover with connectionsPerServer set to 0 |
| */ |
| @Test |
| public void test008BridgeServerFailoverCnx0() throws CacheException { |
| basicTestBridgeServerFailover(0); |
| } |
| |
| private void basicTestBridgeServerFailover(final int cnxCount) throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| |
| // Create two cache servers |
| SerializableRunnable createCacheServer = new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| AttributesFactory factory = getBridgeServerRegionAttributes(null, null); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }; |
| |
| vm0.invoke(createCacheServer); |
| vm1.invoke(createCacheServer); |
| |
| final int port0 = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| final int port1 = vm1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| // final String host1 = getServerHostName(vm1.getHost()); |
| |
| // Create one bridge client in this VM |
| SerializableRunnable create = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| ClientServerTestCase.configureConnectionPool(factory, host0, port0, port1, true, -1, |
| cnxCount, null, 100); |
| |
| Region region = createRegion(name, factory.create()); |
| |
| // force connections to form |
| region.put("keyInit", new Integer(0)); |
| region.put("keyInit2", new Integer(0)); |
| } |
| }; |
| |
| vm2.invoke(create); |
| |
| // Launch async thread that puts objects into cache. This thread will execute until |
| // the test has ended (which is why the RegionDestroyedException and CacheClosedException |
| // are caught and ignored. If any other exception occurs, the test will fail. See |
| // the putAI.exceptionOccurred() assertion below. |
| AsyncInvocation putAI = vm2.invokeAsync(new CacheSerializableRunnable("Put objects") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| try { |
| for (int i = 0; i < 100000; i++) { |
| region.put("keyAI", new Integer(i)); |
| try { |
| Thread.sleep(100); |
| } catch (InterruptedException ie) { |
| fail("interrupted"); |
| } |
| } |
| } catch (NoAvailableServersException ignore) { |
| /* ignore */ |
| } catch (RegionDestroyedException e) { // will be thrown when the test ends |
| /* ignore */ |
| } catch (CancelException e) { // will be thrown when the test ends |
| /* ignore */ |
| } |
| } |
| }); |
| |
| SerializableRunnable verify1Server = new CacheSerializableRunnable("verify1Server") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| PoolImpl pool = getPool(region); |
| verifyServerCount(pool, 1); |
| } |
| }; |
| SerializableRunnable verify2Servers = new CacheSerializableRunnable("verify2Servers") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| PoolImpl pool = getPool(region); |
| verifyServerCount(pool, 2); |
| } |
| }; |
| |
| vm2.invoke(verify2Servers); |
| |
| SerializableRunnable stopCacheServer = new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }; |
| |
| final String expected = "java.io.IOException"; |
| final String addExpected = "<ExpectedException action=add>" + expected + "</ExpectedException>"; |
| final String removeExpected = |
| "<ExpectedException action=remove>" + expected + "</ExpectedException>"; |
| |
| vm2.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| LogWriter bgexecLogger = new LocalLogWriter(ALL.intLevel(), System.out); |
| bgexecLogger.info(addExpected); |
| } |
| }); |
| try { // make sure we removeExpected |
| |
| // Bounce the non-current server (I know that VM1 contains the non-current server |
| // because ... |
| vm1.invoke(stopCacheServer); |
| |
| vm2.invoke(verify1Server); |
| |
| final int restartPort = port1; |
| vm1.invoke(new SerializableRunnable("Restart CacheServer") { |
| @Override |
| public void run() { |
| try { |
| Region region = getRootRegion().getSubregion(name); |
| assertNotNull(region); |
| startBridgeServer(restartPort); |
| } catch (Exception e) { |
| getSystem().getLogWriter().fine(new Exception(e)); |
| org.apache.geode.test.dunit.Assert.fail("Failed to start CacheServer", e); |
| } |
| } |
| }); |
| |
| // Pause long enough for the monitor to realize the server has been bounced |
| // and reconnect to it. |
| vm2.invoke(verify2Servers); |
| |
| } finally { |
| vm2.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| LogWriter bgexecLogger = new LocalLogWriter(ALL.intLevel(), System.out); |
| bgexecLogger.info(removeExpected); |
| } |
| }); |
| } |
| |
| // Stop the other cache server |
| vm0.invoke(stopCacheServer); |
| |
| // Run awhile |
| vm2.invoke(verify1Server); |
| |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() |
| .info("FIXME: this thread does not terminate"); // FIXME |
| // // Verify that no exception has occurred in the putter thread |
| // join(putAI, 5 * 60 * 1000, getLogWriter()); |
| // //assertTrue("Exception occurred while invoking " + putAI, !putAI.exceptionOccurred()); |
| // if (putAI.exceptionOccurred()) { |
| // fail("While putting entries: ", putAI.getException()); |
| // } |
| |
| // Close Pool |
| vm2.invoke(new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| region.localDestroyRegion(); |
| } |
| }); |
| |
| // Stop the last cache server |
| vm1.invoke(stopCacheServer); |
| } |
| |
| |
| protected static volatile boolean stopTestLifetimeExpire = false; |
| |
| protected static volatile int baselineLifetimeCheck; |
| protected static volatile int baselineLifetimeExtensions; |
| protected static volatile int baselineLifetimeConnect; |
| protected static volatile int baselineLifetimeDisconnect; |
| |
| @Test |
| public void basicTestLifetimeExpire() throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| |
| AsyncInvocation putAI = null; |
| AsyncInvocation putAI2 = null; |
| |
| try { |
| |
| // Create two cache servers |
| SerializableRunnable createCacheServer = |
| new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| AttributesFactory factory = getBridgeServerRegionAttributes(null, null); |
| factory.setCacheListener(new DelayListener(25)); |
| createRegion(name, factory.create()); |
| try { |
| startBridgeServer(0); |
| |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }; |
| |
| vm0.invoke(createCacheServer); |
| |
| final int port0 = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| vm1.invoke(createCacheServer); |
| final int port1 = vm1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| SerializableRunnable stopCacheServer = new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }; |
| // we only had to stop it to reserve a port |
| vm1.invoke(stopCacheServer); |
| |
| |
| // Create one bridge client in this VM |
| SerializableRunnable create = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| ClientServerTestCase.configureConnectionPool(factory, host0, port0, port1, |
| false/* queue */, -1, 0, null, 100, 500, 500); |
| |
| Region region = createRegion(name, factory.create()); |
| |
| // force connections to form |
| region.put("keyInit", new Integer(0)); |
| region.put("keyInit2", new Integer(0)); |
| } |
| }; |
| |
| vm2.invoke(create); |
| |
| // Launch async thread that puts objects into cache. This thread will execute until |
| // the test has ended. |
| SerializableRunnable putter1 = new CacheSerializableRunnable("Put objects") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| PoolImpl pool = getPool(region); |
| PoolStats stats = pool.getStats(); |
| baselineLifetimeCheck = stats.getLoadConditioningCheck(); |
| baselineLifetimeExtensions = stats.getLoadConditioningExtensions(); |
| baselineLifetimeConnect = stats.getLoadConditioningConnect(); |
| baselineLifetimeDisconnect = stats.getLoadConditioningDisconnect(); |
| try { |
| int count = 0; |
| while (!stopTestLifetimeExpire) { |
| count++; |
| region.put("keyAI1", new Integer(count)); |
| } |
| } catch (NoAvailableServersException ex) { |
| if (stopTestLifetimeExpire) { |
| return; |
| } else { |
| throw ex; |
| } |
| // } catch (RegionDestroyedException e) { //will be thrown when the test ends |
| // /*ignore*/ |
| // } catch (CancelException e) { //will be thrown when the test ends |
| // /*ignore*/ |
| } |
| } |
| }; |
| SerializableRunnable putter2 = new CacheSerializableRunnable("Put objects") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| try { |
| int count = 0; |
| while (!stopTestLifetimeExpire) { |
| count++; |
| region.put("keyAI2", new Integer(count)); |
| } |
| } catch (NoAvailableServersException ex) { |
| if (stopTestLifetimeExpire) { |
| return; |
| } else { |
| throw ex; |
| } |
| // } catch (RegionDestroyedException e) { //will be thrown when the test ends |
| // /*ignore*/ |
| // } catch (CancelException e) { //will be thrown when the test ends |
| // /*ignore*/ |
| } |
| } |
| }; |
| putAI = vm2.invokeAsync(putter1); |
| putAI2 = vm2.invokeAsync(putter2); |
| |
| SerializableRunnable verify1Server = new CacheSerializableRunnable("verify1Server") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| PoolImpl pool = getPool(region); |
| final PoolStats stats = pool.getStats(); |
| verifyServerCount(pool, 1); |
| WaitCriterion ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return stats.getLoadConditioningCheck() >= (10 + baselineLifetimeCheck); |
| } |
| |
| @Override |
| public String description() { |
| return null; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(ev); |
| |
| // make sure no replacements are happening. |
| // since we have 2 threads and 2 cnxs and 1 server |
| // when lifetimes are up we should only want to connect back to the |
| // server we are already connected to and thus just extend our lifetime |
| assertTrue( |
| "baselineLifetimeCheck=" + baselineLifetimeCheck |
| + " but stats.getLoadConditioningCheck()=" + stats.getLoadConditioningCheck(), |
| stats.getLoadConditioningCheck() >= (10 + baselineLifetimeCheck)); |
| baselineLifetimeCheck = stats.getLoadConditioningCheck(); |
| assertTrue(stats.getLoadConditioningExtensions() > baselineLifetimeExtensions); |
| assertTrue(stats.getLoadConditioningConnect() == baselineLifetimeConnect); |
| assertTrue(stats.getLoadConditioningDisconnect() == baselineLifetimeDisconnect); |
| } |
| }; |
| SerializableRunnable verify2Servers = new CacheSerializableRunnable("verify2Servers") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| PoolImpl pool = getPool(region); |
| final PoolStats stats = pool.getStats(); |
| verifyServerCount(pool, 2); |
| // make sure some replacements are happening. |
| // since we have 2 threads and 2 cnxs and 2 servers |
| // when lifetimes are up we should connect to the other server sometimes. |
| // int retry = 300; |
| // while ((retry-- > 0) |
| // && (stats.getLoadConditioningCheck() < (10+baselineLifetimeCheck))) { |
| // pause(100); |
| // } |
| // assertTrue("Bug 39209 expected " |
| // + stats.getLoadConditioningCheck() |
| // + " to be >= " |
| // + (10+baselineLifetimeCheck), |
| // stats.getLoadConditioningCheck() >= (10+baselineLifetimeCheck)); |
| |
| // TODO: does this WaitCriterion actually help? |
| WaitCriterion wc = new WaitCriterion() { |
| String excuse; |
| |
| @Override |
| public boolean done() { |
| int actual = stats.getLoadConditioningCheck(); |
| int expected = 10 + baselineLifetimeCheck; |
| if (actual >= expected) { |
| return true; |
| } |
| excuse = "Bug 39209 expected " + actual + " to be >= " + expected; |
| return false; |
| } |
| |
| @Override |
| public String description() { |
| return excuse; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(wc); |
| |
| assertTrue(stats.getLoadConditioningConnect() > baselineLifetimeConnect); |
| assertTrue(stats.getLoadConditioningDisconnect() > baselineLifetimeDisconnect); |
| } |
| }; |
| |
| vm2.invoke(verify1Server); |
| assertEquals(true, putAI.isAlive()); |
| assertEquals(true, putAI2.isAlive()); |
| } finally { |
| vm2.invoke(new SerializableRunnable("Stop Putters") { |
| @Override |
| public void run() { |
| stopTestLifetimeExpire = true; |
| } |
| }); |
| |
| try { |
| if (putAI != null) { |
| // Verify that no exception has occurred in the putter thread |
| ThreadUtils.join(putAI, 30 * 1000); |
| if (putAI.exceptionOccurred()) { |
| org.apache.geode.test.dunit.Assert.fail("While putting entries: ", |
| putAI.getException()); |
| } |
| } |
| |
| if (putAI2 != null) { |
| // Verify that no exception has occurred in the putter thread |
| ThreadUtils.join(putAI, 30 * 1000); |
| // FIXME this thread does not terminate |
| // if (putAI2.exceptionOccurred()) { |
| // fail("While putting entries: ", putAI.getException()); |
| // } |
| } |
| |
| } finally { |
| vm2.invoke(new SerializableRunnable("Stop Putters") { |
| @Override |
| public void run() { |
| stopTestLifetimeExpire = false; |
| } |
| }); |
| // Close Pool |
| vm2.invoke(new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| String poolName = region.getAttributes().getPoolName(); |
| region.localDestroyRegion(); |
| PoolManager.find(poolName).destroy(); |
| } |
| }); |
| |
| SerializableRunnable stopCacheServer = new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }; |
| vm1.invoke(stopCacheServer); |
| vm0.invoke(stopCacheServer); |
| } |
| } |
| } |
| |
| /** |
| * Tests the create operation of the {@link Pool} |
| * |
| * @since GemFire 3.5 |
| */ |
| @Test |
| public void test011PoolCreate() throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = Host.getHost(0).getVM(2); |
| |
| vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| AttributesFactory factory = getBridgeServerRegionAttributes(null, null); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }); |
| final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| SerializableRunnable create = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, false, -1, -1, null); |
| createRegion(name, factory.create()); |
| } |
| }; |
| |
| vm1.invoke(create); |
| vm1.invoke(new CacheSerializableRunnable("Create values") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| region.create(new Integer(i), new Integer(i)); |
| } |
| } |
| }); |
| |
| vm2.invoke(create); |
| vm2.invoke(new CacheSerializableRunnable("Validate values") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| Object value = region.get(new Integer(i)); |
| assertNotNull(value); |
| assertTrue(value instanceof Integer); |
| assertEquals(i, ((Integer) value).intValue()); |
| } |
| } |
| }); |
| |
| SerializableRunnable close = new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| region.localDestroyRegion(); |
| } |
| }; |
| |
| vm1.invoke(close); |
| vm2.invoke(close); |
| |
| vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| } |
| |
| /** |
| * Tests the put operation of the {@link Pool} |
| * |
| * @since GemFire 3.5 |
| */ |
| @Test |
| public void test012PoolPut() throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = Host.getHost(0).getVM(2); |
| |
| vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| AttributesFactory factory = getBridgeServerRegionAttributes(null, null); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }); |
| |
| final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| SerializableRunnable createPool = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| // create bridge writer |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, false, -1, -1, null); |
| createRegion(name, factory.create()); |
| } |
| }; |
| |
| vm1.invoke(createPool); |
| |
| vm1.invoke(new CacheSerializableRunnable("Put values") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| // put string values |
| region.put("key-string-" + i, "value-" + i); |
| |
| // put object values |
| Order order = new Order(); |
| order.init(i); |
| region.put("key-object-" + i, order); |
| |
| // put byte[] values |
| region.put("key-bytes-" + i, ("value-" + i).getBytes()); |
| } |
| } |
| }); |
| |
| vm2.invoke(createPool); |
| |
| vm2.invoke(new CacheSerializableRunnable("Get / validate string values") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| Object value = region.get("key-string-" + i); |
| assertNotNull(value); |
| assertTrue(value instanceof String); |
| assertEquals("value-" + i, value); |
| } |
| } |
| }); |
| |
| vm2.invoke(new CacheSerializableRunnable("Get / validate object values") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| Object value = region.get("key-object-" + i); |
| assertNotNull(value); |
| assertTrue(value instanceof Order); |
| assertEquals(i, ((Order) value).getIndex()); |
| } |
| } |
| }); |
| |
| vm2.invoke(new CacheSerializableRunnable("Get / validate byte[] values") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| Object value = region.get("key-bytes-" + i); |
| assertNotNull(value); |
| assertTrue(value instanceof byte[]); |
| assertEquals("value-" + i, new String((byte[]) value)); |
| } |
| } |
| }); |
| |
| SerializableRunnable closePool = new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| region.localDestroyRegion(); |
| } |
| }; |
| |
| vm1.invoke(closePool); |
| vm2.invoke(closePool); |
| |
| vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| } |
| |
| /** |
| * Tests the put operation of the {@link Pool} |
| * |
| * @since GemFire 3.5 |
| */ |
| @Test |
| public void test013PoolPutNoDeserialize() throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = Host.getHost(0).getVM(2); |
| |
| vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| AttributesFactory factory = getBridgeServerRegionAttributes(null, null); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }); |
| |
| final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| SerializableRunnable createPool = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, false, -1, -1, null); |
| createRegion(name, factory.create()); |
| } |
| }; |
| |
| vm1.invoke(createPool); |
| |
| vm1.invoke(new CacheSerializableRunnable("Put values") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| // put string values |
| region.put("key-string-" + i, "value-" + i); |
| |
| // put object values |
| Order order = new Order(); |
| order.init(i); |
| region.put("key-object-" + i, order); |
| |
| // put byte[] values |
| region.put("key-bytes-" + i, ("value-" + i).getBytes()); |
| } |
| } |
| }); |
| |
| vm2.invoke(createPool); |
| |
| vm2.invoke(new CacheSerializableRunnable("Get / validate string values") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| Object value = region.get("key-string-" + i); |
| assertNotNull(value); |
| assertTrue(value instanceof String); |
| assertEquals("value-" + i, value); |
| } |
| } |
| }); |
| |
| vm2.invoke(new CacheSerializableRunnable("Get / validate object values") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| Object value = region.get("key-object-" + i); |
| assertNotNull(value); |
| assertTrue(value instanceof Order); |
| assertEquals(i, ((Order) value).getIndex()); |
| } |
| } |
| }); |
| |
| vm2.invoke(new CacheSerializableRunnable("Get / validate byte[] values") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| Object value = region.get("key-bytes-" + i); |
| assertNotNull(value); |
| assertTrue(value instanceof byte[]); |
| assertEquals("value-" + i, new String((byte[]) value)); |
| } |
| } |
| }); |
| |
| SerializableRunnable closePool = new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| region.localDestroyRegion(); |
| } |
| }; |
| |
| vm1.invoke(closePool); |
| vm2.invoke(closePool); |
| |
| vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| Wait.pause(5 * 1000); |
| } |
| |
| /** |
| * Tests that invalidates and destroys are propagated to {@link Pool}s. |
| * |
| * @since GemFire 3.5 |
| */ |
| @Test |
| public void test014InvalidateAndDestroyPropagation() throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| |
| vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| AttributesFactory factory = getBridgeServerRegionAttributes(null, null); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| |
| }); |
| final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| SerializableRunnable create = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null); |
| CertifiableTestCacheListener l = new CertifiableTestCacheListener( |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()); |
| factory.setCacheListener(l); |
| Region rgn = createRegion(name, factory.create()); |
| rgn.registerInterestRegex(".*", false, false); |
| } |
| }; |
| |
| vm1.invoke(create); |
| vm1.invoke(new CacheSerializableRunnable("Populate region") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| region.put(new Integer(i), "old" + i); |
| } |
| } |
| }); |
| vm2.invoke(create); |
| Wait.pause(5 * 1000); |
| |
| vm1.invoke(new CacheSerializableRunnable("Turn on history") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| CertifiableTestCacheListener ctl = |
| (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); |
| ctl.enableEventHistory(); |
| } |
| }); |
| vm2.invoke(new CacheSerializableRunnable("Update region") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| region.put(new Integer(i), "new" + i, "callbackArg" + i); |
| } |
| } |
| }); |
| Wait.pause(5 * 1000); |
| |
| vm1.invoke(new CacheSerializableRunnable("Verify invalidates") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| CertifiableTestCacheListener ctl = |
| (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); |
| for (int i = 0; i < 10; i++) { |
| Object key = new Integer(i); |
| ctl.waitForInvalidated(key); |
| Region.Entry entry = region.getEntry(key); |
| assertNotNull(entry); |
| assertNull(entry.getValue()); |
| } |
| { |
| List l = ctl.getEventHistory(); |
| assertEquals(10, l.size()); |
| for (int i = 0; i < 10; i++) { |
| Object key = new Integer(i); |
| EntryEvent ee = (EntryEvent) l.get(i); |
| assertEquals(key, ee.getKey()); |
| assertEquals("old" + i, ee.getOldValue()); |
| assertEquals(Operation.INVALIDATE, ee.getOperation()); |
| assertEquals("callbackArg" + i, ee.getCallbackArgument()); |
| assertEquals(true, ee.isOriginRemote()); |
| } |
| } |
| } |
| }); |
| |
| vm2.invoke(new CacheSerializableRunnable("Validate original and destroy") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| Object key = new Integer(i); |
| assertEquals("new" + i, region.getEntry(key).getValue()); |
| region.destroy(key, "destroyCB" + i); |
| } |
| } |
| }); |
| Wait.pause(5 * 1000); |
| |
| vm1.invoke(new CacheSerializableRunnable("Verify destroys") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| CertifiableTestCacheListener ctl = |
| (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); |
| for (int i = 0; i < 10; i++) { |
| Object key = new Integer(i); |
| ctl.waitForDestroyed(key); |
| Region.Entry entry = region.getEntry(key); |
| assertNull(entry); |
| } |
| { |
| List l = ctl.getEventHistory(); |
| assertEquals(10, l.size()); |
| for (int i = 0; i < 10; i++) { |
| Object key = new Integer(i); |
| EntryEvent ee = (EntryEvent) l.get(i); |
| assertEquals(key, ee.getKey()); |
| assertEquals(null, ee.getOldValue()); |
| assertEquals(Operation.DESTROY, ee.getOperation()); |
| assertEquals("destroyCB" + i, ee.getCallbackArgument()); |
| assertEquals(true, ee.isOriginRemote()); |
| } |
| } |
| } |
| }); |
| vm2.invoke(new CacheSerializableRunnable("recreate") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| Object key = new Integer(i); |
| region.create(key, "create" + i); |
| } |
| } |
| }); |
| Wait.pause(5 * 1000); |
| |
| vm1.invoke(new CacheSerializableRunnable("Verify creates") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| CertifiableTestCacheListener ctl = |
| (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); |
| List l = ctl.getEventHistory(); |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() |
| .info("history (should be empty): " + l); |
| assertEquals(0, l.size()); |
| // now see if we can get it from the server |
| for (int i = 0; i < 10; i++) { |
| Object key = new Integer(i); |
| assertEquals("create" + i, region.get(key, "loadCB" + i)); |
| } |
| l = ctl.getEventHistory(); |
| assertEquals(10, l.size()); |
| for (int i = 0; i < 10; i++) { |
| Object key = new Integer(i); |
| EntryEvent ee = (EntryEvent) l.get(i); |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("processing " + ee); |
| assertEquals(key, ee.getKey()); |
| assertEquals(null, ee.getOldValue()); |
| assertEquals("create" + i, ee.getNewValue()); |
| assertEquals(Operation.LOCAL_LOAD_CREATE, ee.getOperation()); |
| assertEquals("loadCB" + i, ee.getCallbackArgument()); |
| assertEquals(false, ee.isOriginRemote()); |
| } |
| } |
| }); |
| |
| SerializableRunnable close = new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| region.localDestroyRegion(); |
| } |
| }; |
| |
| vm1.invoke(close); |
| vm2.invoke(close); |
| |
| vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| } |
| |
| /** |
| * Tests that invalidates and destroys are propagated to {@link Pool}s correctly to |
| * DataPolicy.EMPTY + InterestPolicy.ALL |
| * |
| * @since GemFire 5.0 |
| */ |
| @Test |
| public void test015InvalidateAndDestroyToEmptyAllPropagation() throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| |
| vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| AttributesFactory factory = getBridgeServerRegionAttributes(null, null); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }); |
| final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| SerializableRunnable createEmpty = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null); |
| CertifiableTestCacheListener l = new CertifiableTestCacheListener( |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()); |
| factory.setCacheListener(l); |
| factory.setDataPolicy(DataPolicy.EMPTY); |
| factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL)); |
| Region rgn = createRegion(name, factory.create()); |
| rgn.registerInterestRegex(".*", false, false); |
| } |
| }; |
| SerializableRunnable createNormal = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null); |
| CertifiableTestCacheListener l = new CertifiableTestCacheListener( |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()); |
| factory.setCacheListener(l); |
| Region rgn = createRegion(name, factory.create()); |
| rgn.registerInterestRegex(".*", false, false); |
| } |
| }; |
| |
| vm1.invoke(createEmpty); |
| vm1.invoke(new CacheSerializableRunnable("Populate region") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| region.put(new Integer(i), "old" + i); |
| } |
| } |
| }); |
| |
| vm2.invoke(createNormal); |
| vm1.invoke(new CacheSerializableRunnable("Turn on history") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| CertifiableTestCacheListener ctl = |
| (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); |
| ctl.enableEventHistory(); |
| } |
| }); |
| vm2.invoke(new CacheSerializableRunnable("Update region") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| region.put(new Integer(i), "new" + i, "callbackArg" + i); |
| } |
| } |
| }); |
| Wait.pause(5 * 1000); |
| |
| vm1.invoke(new CacheSerializableRunnable("Verify invalidates") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| CertifiableTestCacheListener ctl = |
| (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); |
| for (int i = 0; i < 10; i++) { |
| Object key = new Integer(i); |
| ctl.waitForInvalidated(key); |
| Region.Entry entry = region.getEntry(key); |
| assertNull(entry); // we are empty! |
| } |
| { |
| List l = ctl.getEventHistory(); |
| assertEquals(10, l.size()); |
| for (int i = 0; i < 10; i++) { |
| Object key = new Integer(i); |
| EntryEvent ee = (EntryEvent) l.get(i); |
| assertEquals(key, ee.getKey()); |
| assertEquals(null, ee.getOldValue()); |
| assertEquals(false, ee.isOldValueAvailable()); // failure |
| assertEquals(Operation.INVALIDATE, ee.getOperation()); |
| assertEquals("callbackArg" + i, ee.getCallbackArgument()); |
| assertEquals(true, ee.isOriginRemote()); |
| } |
| } |
| |
| } |
| }); |
| |
| vm2.invoke(new CacheSerializableRunnable("Validate original and destroy") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| Object key = new Integer(i); |
| assertEquals("new" + i, region.getEntry(key).getValue()); |
| region.destroy(key, "destroyCB" + i); |
| } |
| } |
| }); |
| Wait.pause(5 * 1000); |
| |
| vm1.invoke(new CacheSerializableRunnable("Verify destroys") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| CertifiableTestCacheListener ctl = |
| (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); |
| for (int i = 0; i < 10; i++) { |
| Object key = new Integer(i); |
| ctl.waitForDestroyed(key); |
| Region.Entry entry = region.getEntry(key); |
| assertNull(entry); |
| } |
| { |
| List l = ctl.getEventHistory(); |
| assertEquals(10, l.size()); |
| for (int i = 0; i < 10; i++) { |
| Object key = new Integer(i); |
| EntryEvent ee = (EntryEvent) l.get(i); |
| assertEquals(key, ee.getKey()); |
| assertEquals(null, ee.getOldValue()); |
| assertEquals(false, ee.isOldValueAvailable()); |
| assertEquals(Operation.DESTROY, ee.getOperation()); |
| assertEquals("destroyCB" + i, ee.getCallbackArgument()); |
| assertEquals(true, ee.isOriginRemote()); |
| } |
| } |
| } |
| }); |
| vm2.invoke(new CacheSerializableRunnable("recreate") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| Object key = new Integer(i); |
| region.create(key, "create" + i, "createCB" + i); |
| } |
| } |
| }); |
| Wait.pause(5 * 1000); |
| |
| vm1.invoke(new CacheSerializableRunnable("Verify creates") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| CertifiableTestCacheListener ctl = |
| (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); |
| for (int i = 0; i < 10; i++) { |
| Object key = new Integer(i); |
| ctl.waitForInvalidated(key); |
| Region.Entry entry = region.getEntry(key); |
| assertNull(entry); |
| } |
| List l = ctl.getEventHistory(); |
| assertEquals(10, l.size()); |
| for (int i = 0; i < 10; i++) { |
| Object key = new Integer(i); |
| EntryEvent ee = (EntryEvent) l.get(i); |
| assertEquals(key, ee.getKey()); |
| assertEquals(null, ee.getOldValue()); |
| assertEquals(false, ee.isOldValueAvailable()); |
| assertEquals(Operation.INVALIDATE, ee.getOperation()); |
| assertEquals("createCB" + i, ee.getCallbackArgument()); |
| assertEquals(true, ee.isOriginRemote()); |
| } |
| // now see if we can get it from the server |
| for (int i = 0; i < 10; i++) { |
| Object key = new Integer(i); |
| assertEquals("create" + i, region.get(key, "loadCB" + i)); |
| } |
| l = ctl.getEventHistory(); |
| assertEquals(10, l.size()); |
| for (int i = 0; i < 10; i++) { |
| Object key = new Integer(i); |
| EntryEvent ee = (EntryEvent) l.get(i); |
| assertEquals(key, ee.getKey()); |
| assertEquals(null, ee.getOldValue()); |
| assertEquals("create" + i, ee.getNewValue()); |
| assertEquals(Operation.LOCAL_LOAD_CREATE, ee.getOperation()); |
| assertEquals("loadCB" + i, ee.getCallbackArgument()); |
| assertEquals(false, ee.isOriginRemote()); |
| } |
| } |
| }); |
| |
| SerializableRunnable close = new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| region.localDestroyRegion(); |
| } |
| }; |
| |
| vm1.invoke(close); |
| vm2.invoke(close); |
| |
| vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| } |
| |
| /** |
| * Tests that invalidates and destroys are propagated to {@link Pool}s correctly to |
| * DataPolicy.EMPTY + InterestPolicy.CACHE_CONTENT |
| * |
| * @since GemFire 5.0 |
| */ |
| @Test |
| public void test016InvalidateAndDestroyToEmptyCCPropagation() throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| |
| vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| AttributesFactory factory = getBridgeServerRegionAttributes(null, null); |
| createRegion(name, factory.create()); |
| try { |
| startBridgeServer(0); |
| |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }); |
| final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| SerializableRunnable createEmpty = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null); |
| CertifiableTestCacheListener l = new CertifiableTestCacheListener( |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()); |
| factory.setCacheListener(l); |
| factory.setDataPolicy(DataPolicy.EMPTY); |
| factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.CACHE_CONTENT)); |
| Region rgn = createRegion(name, factory.create()); |
| rgn.registerInterestRegex(".*", false, false); |
| } |
| }; |
| SerializableRunnable createNormal = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null); |
| CertifiableTestCacheListener l = new CertifiableTestCacheListener( |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()); |
| factory.setCacheListener(l); |
| Region rgn = createRegion(name, factory.create()); |
| rgn.registerInterestRegex(".*", false, false); |
| } |
| }; |
| |
| vm1.invoke(createEmpty); |
| vm1.invoke(new CacheSerializableRunnable("Populate region") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| region.put(new Integer(i), "old" + i); |
| } |
| } |
| }); |
| |
| vm2.invoke(createNormal); |
| vm1.invoke(new CacheSerializableRunnable("Turn on history") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| CertifiableTestCacheListener ctl = |
| (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); |
| ctl.enableEventHistory(); |
| } |
| }); |
| vm2.invoke(new CacheSerializableRunnable("Update region") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| region.put(new Integer(i), "new" + i, "callbackArg" + i); |
| } |
| } |
| }); |
| Wait.pause(5 * 1000); |
| |
| vm1.invoke(new CacheSerializableRunnable("Verify invalidates") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| CertifiableTestCacheListener ctl = |
| (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); |
| List l = ctl.getEventHistory(); |
| assertEquals(0, l.size()); |
| } |
| }); |
| |
| |
| vm2.invoke(new CacheSerializableRunnable("Validate original and destroy") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| Object key = new Integer(i); |
| assertEquals("new" + i, region.getEntry(key).getValue()); |
| region.destroy(key, "destroyCB" + i); |
| } |
| } |
| }); |
| |
| vm1.invoke(new CacheSerializableRunnable("Verify destroys") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| CertifiableTestCacheListener ctl = |
| (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); |
| List l = ctl.getEventHistory(); |
| assertEquals(0, l.size()); |
| } |
| }); |
| vm2.invoke(new CacheSerializableRunnable("recreate") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| Object key = new Integer(i); |
| region.create(key, "create" + i, "createCB" + i); |
| } |
| } |
| }); |
| Wait.pause(5 * 1000); |
| |
| vm1.invoke(new CacheSerializableRunnable("Verify creates") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| CertifiableTestCacheListener ctl = |
| (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); |
| List l = ctl.getEventHistory(); |
| assertEquals(0, l.size()); |
| // now see if we can get it from the server |
| for (int i = 0; i < 10; i++) { |
| Object key = new Integer(i); |
| assertEquals("create" + i, region.get(key, "loadCB" + i)); |
| } |
| l = ctl.getEventHistory(); |
| assertEquals(10, l.size()); |
| for (int i = 0; i < 10; i++) { |
| Object key = new Integer(i); |
| EntryEvent ee = (EntryEvent) l.get(i); |
| assertEquals(key, ee.getKey()); |
| assertEquals(null, ee.getOldValue()); |
| assertEquals("create" + i, ee.getNewValue()); |
| assertEquals(Operation.LOCAL_LOAD_CREATE, ee.getOperation()); |
| assertEquals("loadCB" + i, ee.getCallbackArgument()); |
| assertEquals(false, ee.isOriginRemote()); |
| } |
| } |
| }); |
| |
| SerializableRunnable close = new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| region.localDestroyRegion(); |
| } |
| }; |
| |
| vm1.invoke(close); |
| vm2.invoke(close); |
| |
| vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| } |
| |
| /** |
| * Tests interest key registration. |
| */ |
| @Test |
| public void test017ExpireDestroyHasEntryInCallback() throws CacheException { |
| disconnectAllFromDS(); |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| |
| // Create cache server |
| vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| // In lieu of System.setProperty("gemfire.EXPIRE_SENDS_ENTRY_AS_CALLBACK", "true"); |
| EntryExpiryTask.expireSendsEntryAsCallback = true; |
| AttributesFactory factory = getBridgeServerRegionAttributes(null, null); |
| factory.setEntryTimeToLive(new ExpirationAttributes(1, ExpirationAction.DESTROY)); |
| createRegion(name, factory.create()); |
| try { |
| startBridgeServer(0); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| } |
| }); |
| |
| // Create cache server clients |
| final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| SerializableRunnable createClient = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setDataPolicy(DataPolicy.EMPTY); |
| factory.setSubscriptionAttributes(new SubscriptionAttributes((InterestPolicy.ALL))); |
| factory.setConcurrencyChecksEnabled(false); |
| // create bridge writer |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null); |
| CertifiableTestCacheListener l = new CertifiableTestCacheListener( |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()); |
| factory.setCacheListener(l); |
| |
| Region r = createRegion(name, factory.create()); |
| r.registerInterest("ALL_KEYS"); |
| } |
| }; |
| |
| vm1.invoke(createClient); |
| |
| vm1.invoke(new CacheSerializableRunnable("Turn on history") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| CertifiableTestCacheListener ctl = |
| (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); |
| ctl.enableEventHistory(); |
| } |
| }); |
| Wait.pause(500); |
| |
| // Create some entries on the client |
| vm1.invoke(new CacheSerializableRunnable("Create entries") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 5; i++) { |
| region.put("key-client-" + i, "value-client-" + i); |
| } |
| } |
| }); |
| |
| // Create some entries on the server |
| vm0.invoke(new CacheSerializableRunnable("Create entries") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 5; i++) { |
| region.put("key-server-" + i, "value-server-" + i); |
| } |
| } |
| }); |
| |
| // Wait for expiration |
| Wait.pause(2000); |
| |
| vm1.invoke(new CacheSerializableRunnable("Validate listener events") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| CertifiableTestCacheListener ctl = |
| (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); |
| int destroyCallbacks = 0; |
| List<CacheEvent> l = ctl.getEventHistory(); |
| for (CacheEvent ce : l) { |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("--->>> " + ce); |
| if (ce.getOperation() == Operation.DESTROY |
| && ce.getCallbackArgument() instanceof String) { |
| destroyCallbacks++; |
| } |
| } |
| assertEquals(10, destroyCallbacks); |
| } |
| }); |
| |
| // Close cache server clients |
| SerializableRunnable close = new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| region.localDestroyRegion(); |
| } |
| }; |
| |
| vm1.invoke(close); |
| |
| // Stop cache server |
| vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| } |
| |
| public AttributesFactory getBridgeServerRegionAttributes(CacheLoader cl, CacheWriter cw) { |
| AttributesFactory ret = new AttributesFactory(); |
| if (cl != null) { |
| ret.setCacheLoader(cl); |
| } |
| if (cw != null) { |
| ret.setCacheWriter(cw); |
| } |
| ret.setScope(Scope.DISTRIBUTED_ACK); |
| ret.setConcurrencyChecksEnabled(false); |
| return ret; |
| } |
| |
| public AttributesFactory getBridgeServerMirroredRegionAttributes(CacheLoader cl, CacheWriter cw) { |
| AttributesFactory ret = new AttributesFactory(); |
| if (cl != null) { |
| ret.setCacheLoader(cl); |
| } |
| if (cw != null) { |
| ret.setCacheWriter(cw); |
| } |
| ret.setScope(Scope.DISTRIBUTED_NO_ACK); |
| ret.setDataPolicy(DataPolicy.REPLICATE); |
| ret.setConcurrencyChecksEnabled(false); |
| |
| return ret; |
| } |
| |
| public AttributesFactory getBridgeServerMirroredAckRegionAttributes(CacheLoader cl, |
| CacheWriter cw) { |
| AttributesFactory ret = new AttributesFactory(); |
| if (cl != null) { |
| ret.setCacheLoader(cl); |
| } |
| if (cw != null) { |
| ret.setCacheWriter(cw); |
| } |
| ret.setScope(Scope.DISTRIBUTED_ACK); |
| ret.setConcurrencyChecksEnabled(false); |
| ret.setMirrorType(MirrorType.KEYS_VALUES); |
| |
| return ret; |
| } |
| |
| /** |
| * Tests that updates are not sent to VMs that did not ask for them. |
| */ |
| @Test |
| public void test018OnlyRequestedUpdates() throws Exception { |
| final String name1 = this.getName() + "-1"; |
| final String name2 = this.getName() + "-2"; |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| |
| // Cache server serves up both regions |
| vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| AttributesFactory factory = getBridgeServerRegionAttributes(null, null); |
| createRegion(name1, factory.create()); |
| createRegion(name2, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }); |
| final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| // vm1 sends updates to the server |
| vm1.invoke(new CacheSerializableRunnable("Create regions") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null); |
| |
| |
| Region rgn = createRegion(name1, factory.create()); |
| rgn.registerInterestRegex(".*", false, false); |
| rgn = createRegion(name2, factory.create()); |
| rgn.registerInterestRegex(".*", false, false); |
| |
| } |
| }); |
| |
| // vm2 only wants updates to updates to region1 |
| vm2.invoke(new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null); |
| |
| Region rgn = createRegion(name1, factory.create()); |
| rgn.registerInterestRegex(".*", false, false); |
| createRegion(name2, factory.create()); |
| // no interest registration for region 2 |
| } |
| }); |
| |
| SerializableRunnable populate = new CacheSerializableRunnable("Populate region") { |
| @Override |
| public void run2() throws CacheException { |
| Region region1 = getRootRegion().getSubregion(name1); |
| for (int i = 0; i < 10; i++) { |
| region1.put(new Integer(i), "Region1Old" + i); |
| } |
| Region region2 = getRootRegion().getSubregion(name2); |
| for (int i = 0; i < 10; i++) { |
| region2.put(new Integer(i), "Region2Old" + i); |
| } |
| } |
| }; |
| vm1.invoke(populate); |
| vm2.invoke(populate); |
| |
| vm1.invoke(new CacheSerializableRunnable("Update") { |
| @Override |
| public void run2() throws CacheException { |
| Region region1 = getRootRegion().getSubregion(name1); |
| for (int i = 0; i < 10; i++) { |
| region1.put(new Integer(i), "Region1New" + i); |
| } |
| Region region2 = getRootRegion().getSubregion(name2); |
| for (int i = 0; i < 10; i++) { |
| region2.put(new Integer(i), "Region2New" + i); |
| } |
| } |
| }); |
| |
| // Wait for updates to be propagated |
| Wait.pause(5 * 1000); |
| |
| vm2.invoke(new CacheSerializableRunnable("Validate") { |
| @Override |
| public void run2() throws CacheException { |
| Region region1 = getRootRegion().getSubregion(name1); |
| for (int i = 0; i < 10; i++) { |
| assertEquals("Region1New" + i, region1.get(new Integer(i))); |
| } |
| Region region2 = getRootRegion().getSubregion(name2); |
| for (int i = 0; i < 10; i++) { |
| assertEquals("Region2Old" + i, region2.get(new Integer(i))); |
| } |
| } |
| }); |
| |
| vm1.invoke(new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| // Terminate region1's Pool |
| Region region1 = getRootRegion().getSubregion(name1); |
| region1.localDestroyRegion(); |
| // Terminate region2's Pool |
| Region region2 = getRootRegion().getSubregion(name2); |
| region2.localDestroyRegion(); |
| } |
| }); |
| |
| vm2.invoke(new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| // Terminate region1's Pool |
| Region region1 = getRootRegion().getSubregion(name1); |
| region1.localDestroyRegion(); |
| } |
| }); |
| |
| vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| } |
| |
| |
| /** |
| * Tests interest key registration. |
| */ |
| @Test |
| public void test019InterestKeyRegistration() throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| |
| // Create cache server |
| vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| CacheLoader cl = new CacheLoader() { |
| @Override |
| public Object load(LoaderHelper helper) { |
| return helper.getKey(); |
| } |
| |
| @Override |
| public void close() { |
| |
| } |
| }; |
| AttributesFactory factory = getBridgeServerRegionAttributes(cl, null); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }); |
| |
| // Create cache server clients |
| final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| SerializableRunnable create = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| // create bridge writer |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null); |
| createRegion(name, factory.create()); |
| } |
| }; |
| |
| vm1.invoke(create); |
| vm2.invoke(create); |
| |
| // Get values for key 1 and key 2 so that there are entries in the clients. |
| // Register interest in one of the keys. |
| vm1.invoke(new CacheSerializableRunnable("Create Entries and Register Interest") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| assertEquals(region.get("key-1"), "key-1"); |
| assertEquals(region.get("key-2"), "key-2"); |
| try { |
| region.registerInterest("key-1"); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While registering interest: ", ex); |
| } |
| } |
| }); |
| |
| vm2.invoke(new CacheSerializableRunnable("Create Entries and Register Interest") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| assertEquals(region.get("key-1"), "key-1"); |
| assertEquals(region.get("key-2"), "key-2"); |
| try { |
| region.registerInterest("key-2"); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While registering interest: ", ex); |
| } |
| } |
| }); |
| |
| // Put new values and validate updates (VM1) |
| vm1.invoke(new CacheSerializableRunnable("Put New Values") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| region.put("key-1", "vm1-key-1"); |
| region.put("key-2", "vm1-key-2"); |
| // Verify that no invalidates occurred to this region |
| assertEquals(region.getEntry("key-1").getValue(), "vm1-key-1"); |
| assertEquals(region.getEntry("key-2").getValue(), "vm1-key-2"); |
| } |
| }); |
| |
| Wait.pause(500); |
| vm2.invoke(new CacheSerializableRunnable("Validate Entries") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| // Verify that 'key-2' was updated, but 'key-1' was not |
| // and contains the original value |
| assertEquals(region.getEntry("key-1").getValue(), "key-1"); |
| assertEquals(region.getEntry("key-2").getValue(), "vm1-key-2"); |
| // assertNull(region.getEntry("key-2").getValue()); |
| } |
| }); |
| |
| // Put new values and validate updates (VM2) |
| vm2.invoke(new CacheSerializableRunnable("Put New Values") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| region.put("key-1", "vm2-key-1"); |
| region.put("key-2", "vm2-key-2"); |
| // Verify that no updates occurred to this region |
| assertEquals(region.getEntry("key-1").getValue(), "vm2-key-1"); |
| assertEquals(region.getEntry("key-2").getValue(), "vm2-key-2"); |
| } |
| }); |
| |
| Wait.pause(500); |
| vm1.invoke(new CacheSerializableRunnable("Validate Entries") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| // Verify that 'key-1' was updated, but 'key-2' was not |
| // and contains the original value |
| assertEquals(region.getEntry("key-2").getValue(), "vm1-key-2"); |
| assertEquals(region.getEntry("key-1").getValue(), "vm2-key-1"); |
| // assertNull(region.getEntry("key-1").getValue()); |
| } |
| }); |
| |
| // Unregister interest |
| vm1.invoke(new CacheSerializableRunnable("Unregister Interest") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| try { |
| region.unregisterInterest("key-1"); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While registering interest: ", ex); |
| } |
| } |
| }); |
| |
| vm2.invoke(new CacheSerializableRunnable("Unregister Interest") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| try { |
| region.unregisterInterest("key-2"); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While registering interest: ", ex); |
| } |
| } |
| }); |
| |
| // Put new values and validate updates (VM1) |
| vm1.invoke(new CacheSerializableRunnable("Put New Values") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| region.put("key-1", "vm1-key-1-again"); |
| region.put("key-2", "vm1-key-2-again"); |
| // Verify that no updates occurred to this region |
| assertEquals(region.getEntry("key-1").getValue(), "vm1-key-1-again"); |
| assertEquals(region.getEntry("key-2").getValue(), "vm1-key-2-again"); |
| } |
| }); |
| |
| Wait.pause(500); |
| vm2.invoke(new CacheSerializableRunnable("Validate Entries") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| // Verify that neither 'key-1' 'key-2' was updated |
| // and contain the original value |
| assertEquals(region.getEntry("key-1").getValue(), "vm2-key-1"); |
| assertEquals(region.getEntry("key-2").getValue(), "vm2-key-2"); |
| } |
| }); |
| |
| // Put new values and validate updates (VM2) |
| vm2.invoke(new CacheSerializableRunnable("Put New Values") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| region.put("key-1", "vm2-key-1-again"); |
| region.put("key-2", "vm2-key-2-again"); |
| // Verify that no updates occurred to this region |
| assertEquals(region.getEntry("key-1").getValue(), "vm2-key-1-again"); |
| assertEquals(region.getEntry("key-2").getValue(), "vm2-key-2-again"); |
| } |
| }); |
| |
| Wait.pause(500); |
| vm1.invoke(new CacheSerializableRunnable("Validate Entries") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| // Verify that neither 'key-1' 'key-2' was updated |
| // and contain the original value |
| assertEquals(region.getEntry("key-1").getValue(), "vm1-key-1-again"); |
| assertEquals(region.getEntry("key-2").getValue(), "vm1-key-2-again"); |
| } |
| }); |
| |
| // Unregister interest again (to verify that a client can unregister interest |
| // in a key that its not interested in with no problem. |
| vm1.invoke(new CacheSerializableRunnable("Unregister Interest") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| try { |
| region.unregisterInterest("key-1"); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While registering interest: ", ex); |
| } |
| } |
| }); |
| |
| vm2.invoke(new CacheSerializableRunnable("Unregister Interest") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| try { |
| region.unregisterInterest("key-2"); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While registering interest: ", ex); |
| } |
| } |
| }); |
| |
| // Close cache server clients |
| SerializableRunnable close = new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| region.localDestroyRegion(); |
| } |
| }; |
| |
| vm1.invoke(close); |
| vm2.invoke(close); |
| |
| // Stop cache server |
| vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| } |
| |
| /** |
| * Tests interest list registration. |
| */ |
| @Test |
| public void test020InterestListRegistration() throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = Host.getHost(0).getVM(2); |
| |
| // Create cache server |
| vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| CacheLoader cl = new CacheLoader() { |
| @Override |
| public Object load(LoaderHelper helper) { |
| return helper.getKey(); |
| } |
| |
| @Override |
| public void close() { |
| |
| } |
| }; |
| AttributesFactory factory = getBridgeServerRegionAttributes(cl, null); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }); |
| |
| // Create cache server clients |
| final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| SerializableRunnable create = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| // create bridge writer |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null); |
| createRegion(name, factory.create()); |
| } |
| }; |
| |
| vm1.invoke(create); |
| vm2.invoke(create); |
| |
| // Get values for key 1 and key 6 so that there are entries in the clients. |
| // Register interest in a list of keys. |
| vm1.invoke(new CacheSerializableRunnable("Create Entries and Register Interest") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| assertEquals(region.get("key-1"), "key-1"); |
| assertEquals(region.get("key-6"), "key-6"); |
| try { |
| List list = new ArrayList(); |
| list.add("key-1"); |
| list.add("key-2"); |
| list.add("key-3"); |
| list.add("key-4"); |
| list.add("key-5"); |
| region.registerInterest(list); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While registering interest: ", ex); |
| } |
| } |
| }); |
| |
| vm2.invoke(new CacheSerializableRunnable("Create Entries and Register Interest") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| assertEquals(region.get("key-1"), "key-1"); |
| assertEquals(region.get("key-6"), "key-6"); |
| } |
| }); |
| |
| // Put new values and validate updates (VM2) |
| vm2.invoke(new CacheSerializableRunnable("Put New Values") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| region.put("key-1", "vm2-key-1"); |
| region.put("key-6", "vm2-key-6"); |
| // Verify that no updates occurred to this region |
| assertEquals(region.getEntry("key-1").getValue(), "vm2-key-1"); |
| assertEquals(region.getEntry("key-6").getValue(), "vm2-key-6"); |
| } |
| }); |
| Wait.pause(5 * 1000); |
| |
| vm1.invoke(new CacheSerializableRunnable("Validate Entries") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| // Verify that 'key-1' was updated |
| assertEquals(region.getEntry("key-1").getValue(), "vm2-key-1"); |
| // Verify that 'key-6' was not invalidated |
| assertEquals(region.getEntry("key-6").getValue(), "key-6"); |
| } |
| }); |
| |
| // Close cache server clients |
| SerializableRunnable close = new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| region.localDestroyRegion(); |
| } |
| }; |
| |
| vm1.invoke(close); |
| vm2.invoke(close); |
| |
| // Stop cache server |
| vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| } |
| |
| protected class ConnectionPoolDUnitTestSerializable2 implements java.io.Serializable { |
| protected ConnectionPoolDUnitTestSerializable2(String key) { |
| _key = key; |
| } |
| |
| public String getKey() { |
| return _key; |
| } |
| |
| protected String _key; |
| } |
| |
| /** |
| * Accessed by reflection DO NOT REMOVE |
| * |
| */ |
| protected static int getCacheServerPort() { |
| return bridgeServerPort; |
| } |
| |
| protected static long getNumberOfAfterCreates() { |
| return numberOfAfterCreates; |
| } |
| |
| protected static long getNumberOfAfterUpdates() { |
| return numberOfAfterUpdates; |
| } |
| |
| protected static long getNumberOfAfterInvalidates() { |
| return numberOfAfterInvalidates; |
| } |
| |
| /** |
| * Creates a "loner" distributed system that has dynamic region creation enabled. |
| * |
| * @since GemFire 4.3 |
| */ |
| protected Cache createDynamicRegionCache(String testName, String connectionPoolName) { |
| // note that clients use non-persistent dr factories. |
| |
| DynamicRegionFactory.get() |
| .open(new DynamicRegionFactory.Config(null, connectionPoolName, false, true)); |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("CREATED IT"); |
| Cache z = getCache(); |
| return z; |
| } |
| |
| /** |
| * A handy method to poll for arrival of non-null/non-invalid entries |
| * |
| * @param r the Region to poll |
| * @param key the key of the Entry to poll for |
| */ |
| public static void waitForEntry(final Region r, final Object key) { |
| WaitCriterion ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return r.containsValueForKey(key); |
| } |
| |
| @Override |
| public String description() { |
| return "Waiting for entry " + key + " on region " + r; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(ev); |
| } |
| |
| public static Region waitForSubRegion(final Region r, final String subRegName) { |
| // final long start = System.currentTimeMillis(); |
| final long MAXWAIT = 10000; |
| WaitCriterion ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return r.getSubregion(subRegName) != null; |
| } |
| |
| @Override |
| public String description() { |
| return "Waiting for subregion " + subRegName; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(ev); |
| Region result = r.getSubregion(subRegName); |
| return result; |
| } |
| |
| public static class CacheServerCacheLoader extends TestCacheLoader implements Declarable { |
| |
| public CacheServerCacheLoader() {} |
| |
| @Override |
| public Object load2(LoaderHelper helper) { |
| if (helper.getArgument() instanceof Integer) { |
| try { |
| Thread.sleep(((Integer) helper.getArgument()).intValue()); |
| } catch (InterruptedException ugh) { |
| fail("interrupted"); |
| } |
| } |
| return helper.getKey(); |
| } |
| |
| @Override |
| public void init(Properties props) {} |
| } |
| |
| /** |
| * Create a server that has a value for every key queried and a unique key/value in the specified |
| * Region that uniquely identifies each instance. |
| * |
| * @param vm the VM on which to create the server |
| * @param rName the name of the Region to create on the server |
| * @param port the TCP port on which the server should listen |
| */ |
| public void createBridgeServer(VM vm, final String rName, final int port, |
| final boolean notifyBySubscription) { |
| vm.invoke(new CacheSerializableRunnable("Create Region on Server") { |
| @Override |
| public void run2() { |
| try { |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setConcurrencyChecksEnabled(false); |
| factory.setCacheLoader(new CacheServerCacheLoader()); |
| beginCacheXml(); |
| createRegion(rName, factory.create()); |
| startBridgeServer(port); |
| finishCacheXml(rName + "-" + port); |
| |
| Region region = getRootRegion().getSubregion(rName); |
| assertNotNull(region); |
| assertNotNull(getRootRegion().getSubregion(rName)); |
| region.put("BridgeServer", new Integer(port)); // A unique key/value to identify the |
| // BridgeServer |
| } catch (Exception e) { |
| getSystem().getLogWriter().severe(e); |
| fail("Failed to start CacheServer " + e); |
| } |
| } |
| }); |
| } |
| |
| // test for bug 35884 |
| @Test |
| public void test021ClientGetOfInvalidServerEntry() throws CacheException { |
| final String regionName1 = this.getName() + "-1"; |
| |
| final Host host = Host.getHost(0); |
| VM server1 = host.getVM(0); |
| VM client = host.getVM(2); |
| |
| SerializableRunnable createServer = new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setDataPolicy(DataPolicy.REPLICATE); |
| factory.setConcurrencyChecksEnabled(false); |
| createRegion(regionName1, factory.create()); |
| |
| Wait.pause(1000); |
| try { |
| startBridgeServer(0); |
| |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }; |
| |
| // Create server1. |
| server1.invoke(createServer); |
| |
| final int port = server1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(server1.getHost()); |
| |
| // Init values at server. |
| server1.invoke(new CacheSerializableRunnable("Create values") { |
| @Override |
| public void run2() throws CacheException { |
| Region region1 = getRootRegion().getSubregion(regionName1); |
| // create it invalid |
| region1.create("key-string-1", null); |
| } |
| }); |
| |
| // now try it with a local scope |
| |
| SerializableRunnable createPool2 = new CacheSerializableRunnable("Create region 2") { |
| @Override |
| public void run2() throws CacheException { |
| // Region region1 = getRootRegion().getSubregion(regionName1); |
| // region1.localDestroyRegion(); |
| getLonerSystem(); |
| AttributesFactory regionFactory = new AttributesFactory(); |
| regionFactory.setScope(Scope.LOCAL); |
| regionFactory.setConcurrencyChecksEnabled(false); |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() |
| .info("ZZZZZ host0:" + host0 + " port:" + port); |
| ClientServerTestCase.configureConnectionPool(regionFactory, host0, port, -1, false, -1, -1, |
| null); |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() |
| .info("ZZZZZDone host0:" + host0 + " port:" + port); |
| createRegion(regionName1, regionFactory.create()); |
| } |
| }; |
| client.invoke(createPool2); |
| |
| // get the invalid entry on the client. |
| client.invoke(new CacheSerializableRunnable("get values on client") { |
| @Override |
| public void run2() throws CacheException { |
| Region region1 = getRootRegion().getSubregion(regionName1); |
| assertEquals(null, region1.getEntry("key-string-1")); |
| assertEquals(null, region1.get("key-string-1")); |
| } |
| }); |
| |
| server1.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| |
| } |
| |
| @Test |
| public void test022ClientRegisterUnregisterRequests() throws CacheException { |
| final String regionName1 = this.getName() + "-1"; |
| |
| final Host host = Host.getHost(0); |
| VM server1 = host.getVM(0); |
| VM client = host.getVM(2); |
| |
| SerializableRunnable createServer = new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setDataPolicy(DataPolicy.REPLICATE); |
| factory.setConcurrencyChecksEnabled(false); |
| createRegion(regionName1, factory.create()); |
| |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }; |
| |
| // Create server1. |
| server1.invoke(createServer); |
| |
| final int port = server1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(server1.getHost()); |
| |
| SerializableRunnable createPool = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| Region region1 = null; |
| |
| AttributesFactory regionFactory = new AttributesFactory(); |
| regionFactory.setScope(Scope.LOCAL); |
| regionFactory.setConcurrencyChecksEnabled(false); |
| |
| ClientServerTestCase.configureConnectionPool(regionFactory, host0, port, -1, true, -1, -1, |
| null); |
| |
| region1 = createRegion(regionName1, regionFactory.create()); |
| region1.getAttributesMutator().addCacheListener(new CertifiableTestCacheListener( |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter())); |
| } |
| }; |
| |
| // Create client. |
| client.invoke(createPool); |
| |
| // Init values at server. |
| server1.invoke(new CacheSerializableRunnable("Create values") { |
| @Override |
| public void run2() throws CacheException { |
| Region region1 = getRootRegion().getSubregion(regionName1); |
| for (int i = 0; i < 20; i++) { |
| region1.put("key-string-" + i, "value-" + i); |
| } |
| } |
| }); |
| |
| |
| // Put some values on the client. |
| client.invoke(new CacheSerializableRunnable("Put values client") { |
| @Override |
| public void run2() throws CacheException { |
| Region region1 = getRootRegion().getSubregion(regionName1); |
| |
| for (int i = 0; i < 10; i++) { |
| region1.put("key-string-" + i, "client-value-" + i); |
| } |
| } |
| }); |
| |
| SerializableRunnable closePool = new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| Region region1 = getRootRegion().getSubregion(regionName1); |
| String pName = region1.getAttributes().getPoolName(); |
| region1.localDestroyRegion(); |
| PoolImpl p = (PoolImpl) PoolManager.find(pName); |
| p.destroy(); |
| } |
| }; |
| |
| client.invoke(closePool); |
| |
| SerializableRunnable validateClientRegisterUnRegister = |
| new CacheSerializableRunnable("validate Client Register UnRegister") { |
| @Override |
| public void run2() throws CacheException { |
| for (Iterator bi = getCache().getCacheServers().iterator(); bi.hasNext();) { |
| CacheServerImpl bsi = (CacheServerImpl) bi.next(); |
| final CacheClientNotifierStats ccnStats = |
| bsi.getAcceptor().getCacheClientNotifier().getStats(); |
| WaitCriterion ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return ccnStats.getClientRegisterRequests() == ccnStats |
| .getClientUnRegisterRequests(); |
| } |
| |
| @Override |
| public String description() { |
| return null; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(ev); |
| assertEquals("HealthMonitor Client Register/UnRegister mismatch.", |
| ccnStats.getClientRegisterRequests(), ccnStats.getClientUnRegisterRequests()); |
| } |
| } |
| }; |
| |
| server1.invoke(validateClientRegisterUnRegister); |
| |
| server1.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| |
| } |
| |
| /** |
| * Tests the containsKeyOnServer operation of the {@link Pool} |
| * |
| * @since GemFire 5.0.2 |
| */ |
| @Test |
| public void test023ContainsKeyOnServer() throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| |
| vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setConcurrencyChecksEnabled(false); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }); |
| final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| SerializableRunnable create = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, false, -1, -1, null); |
| createRegion(name, factory.create()); |
| } |
| }; |
| vm1.invoke(create); |
| vm2.invoke(create); |
| |
| final Integer key1 = new Integer(0); |
| final String key2 = "0"; |
| vm2.invoke(new CacheSerializableRunnable("Contains key on server") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| boolean containsKey = false; |
| containsKey = region.containsKeyOnServer(key1); |
| assertFalse(containsKey); |
| containsKey = region.containsKeyOnServer(key2); |
| assertFalse(containsKey); |
| } |
| }); |
| |
| vm1.invoke(new CacheSerializableRunnable("Put values") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| region.put(new Integer(0), new Integer(0)); |
| region.put("0", "0"); |
| } |
| }); |
| |
| vm2.invoke(new CacheSerializableRunnable("Contains key on server") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| boolean containsKey = false; |
| containsKey = region.containsKeyOnServer(key1); |
| assertTrue(containsKey); |
| containsKey = region.containsKeyOnServer(key2); |
| assertTrue(containsKey); |
| } |
| }); |
| |
| SerializableRunnable close = new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| region.localDestroyRegion(); |
| } |
| }; |
| vm1.invoke(close); |
| vm2.invoke(close); |
| |
| vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| } |
| |
| /** |
| * Tests that invoking {@link Region#create} with a <code>null</code> value does the right thing |
| * with the {@link Pool}. |
| * |
| * @since GemFire 3.5 |
| */ |
| @Test |
| public void test024CreateNullValue() throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| |
| final Object createCallbackArg = "CREATE CALLBACK ARG"; |
| |
| vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| AttributesFactory factory = getBridgeServerRegionAttributes(null, null); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }); |
| final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| SerializableRunnable create = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null); |
| createRegion(name, factory.create()); |
| } |
| }; |
| vm1.invoke(create); |
| |
| vm2.invoke(create); |
| vm2.invoke(new CacheSerializableRunnable("Create nulls") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| region.create(new Integer(i), null, createCallbackArg); |
| } |
| } |
| }); |
| |
| Wait.pause(1000); // Wait for updates to be propagated |
| |
| vm2.invoke(new CacheSerializableRunnable("Verify invalidates") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| Region.Entry entry = region.getEntry(new Integer(i)); |
| assertNotNull(entry); |
| assertNull(entry.getValue()); |
| } |
| } |
| }); |
| |
| vm1.invoke(new CacheSerializableRunnable("Attempt to create values") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| region.create(new Integer(i), "new" + i); |
| } |
| } |
| }); |
| |
| Wait.pause(1000); // Wait for updates to be propagated |
| |
| vm2.invoke(new CacheSerializableRunnable("Verify invalidates") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| Region.Entry entry = region.getEntry(new Integer(i)); |
| assertNotNull(entry); |
| assertNull(entry.getValue()); |
| } |
| } |
| }); |
| |
| SerializableRunnable close = new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| region.localDestroyRegion(); |
| } |
| }; |
| |
| vm1.invoke(close); |
| vm2.invoke(close); |
| |
| vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| } |
| |
| /** |
| * Tests that a {@link Region#localDestroy} is not propagated to the server and that a |
| * {@link Region#destroy} is. Also makes sure that callback arguments are passed correctly. |
| */ |
| @Test |
| public void test025Destroy() throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| |
| final Object callbackArg = "DESTROY CALLBACK"; |
| |
| vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| |
| CacheWriter cw = new TestCacheWriter() { |
| @Override |
| public void beforeCreate2(EntryEvent event) throws CacheWriterException { |
| |
| } |
| |
| @Override |
| public void beforeDestroy2(EntryEvent event) throws CacheWriterException { |
| Object beca = event.getCallbackArgument(); |
| assertEquals(callbackArg, beca); |
| } |
| }; |
| AttributesFactory factory = getBridgeServerRegionAttributes(null, cw); |
| createRegion(name, factory.create()); |
| try { |
| startBridgeServer(0); |
| |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }); |
| |
| final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| SerializableRunnable create = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null); |
| |
| Region rgn = createRegion(name, factory.create()); |
| rgn.registerInterestRegex(".*", false, false); |
| } |
| }; |
| vm1.invoke(create); |
| vm1.invoke(new CacheSerializableRunnable("Populate region") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| region.put(new Integer(i), String.valueOf(i)); |
| } |
| } |
| }); |
| |
| vm2.invoke(create); |
| vm2.invoke(new CacheSerializableRunnable("Load region") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| assertEquals(String.valueOf(i), region.get(new Integer(i))); |
| } |
| } |
| }); |
| |
| vm1.invoke(new CacheSerializableRunnable("Local destroy") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| region.localDestroy(new Integer(i)); |
| } |
| } |
| }); |
| |
| vm2.invoke(new CacheSerializableRunnable("No destroy propagate") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| assertEquals(String.valueOf(i), region.get(new Integer(i))); |
| } |
| } |
| }); |
| |
| vm1.invoke(new CacheSerializableRunnable("Fetch from server") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| assertEquals(String.valueOf(i), region.get(new Integer(i))); |
| } |
| } |
| }); |
| |
| vm0.invoke(new CacheSerializableRunnable("Check no server cache writer") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| TestCacheWriter writer = getTestWriter(region); |
| writer.wasInvoked(); |
| } |
| }); |
| |
| vm1.invoke(new CacheSerializableRunnable("Distributed destroy") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| region.destroy(new Integer(i), callbackArg); |
| } |
| } |
| }); |
| Wait.pause(1000); // Wait for destroys to propagate |
| |
| vm1.invoke(new CacheSerializableRunnable("Attempt get from server") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| assertNull(region.getEntry(new Integer(i))); |
| } |
| } |
| }); |
| |
| vm2.invoke(new CacheSerializableRunnable("Validate destroy propagate") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| assertNull(region.getEntry(new Integer(i))); |
| } |
| } |
| }); |
| |
| SerializableRunnable close = new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| region.localDestroyRegion(); |
| } |
| }; |
| |
| vm1.invoke(close); |
| vm2.invoke(close); |
| |
| vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| } |
| |
| /** |
| * Tests that a {@link Region#localDestroyRegion} is not propagated to the server and that a |
| * {@link Region#destroyRegion} is. Also makes sure that callback arguments are passed correctly. |
| */ |
| @Ignore("TODO") |
| @Test |
| public void testDestroyRegion() throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| |
| final Object callbackArg = "DESTROY CALLBACK"; |
| |
| vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| |
| CacheWriter cw = new TestCacheWriter() { |
| @Override |
| public void beforeCreate2(EntryEvent event) throws CacheWriterException { |
| |
| } |
| |
| @Override |
| public void beforeRegionDestroy2(RegionEvent event) throws CacheWriterException { |
| |
| assertEquals(callbackArg, event.getCallbackArgument()); |
| } |
| }; |
| AttributesFactory factory = getBridgeServerRegionAttributes(null, cw); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }); |
| final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| SerializableRunnable create = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null); |
| createRegion(name, factory.create()); |
| } |
| }; |
| |
| vm1.invoke(create); |
| vm2.invoke(create); |
| |
| vm1.invoke(new CacheSerializableRunnable("Local destroy region") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| region.localDestroyRegion(); |
| assertNull(getRootRegion().getSubregion(name)); |
| // close the bridge writer to prevent callbacks on the connections |
| // Not necessary since locally destroying the region takes care of this. |
| // getPoolClient(region).close(); |
| } |
| }); |
| |
| vm2.invoke(new CacheSerializableRunnable("No destroy propagate") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| assertNotNull(region); |
| } |
| }); |
| |
| vm0.invoke(new CacheSerializableRunnable("Check no server cache writer") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| TestCacheWriter writer = getTestWriter(region); |
| writer.wasInvoked(); |
| } |
| }); |
| |
| vm1.invoke(create); |
| |
| vm1.invoke(new CacheSerializableRunnable("Distributed destroy region") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| assertNotNull(region); |
| region.destroyRegion(callbackArg); |
| assertNull(getRootRegion().getSubregion(name)); |
| // close the bridge writer to prevent callbacks on the connections |
| // Not necessary since locally destroying the region takes care of this. |
| // getPoolClient(region).close(); |
| } |
| }); |
| Wait.pause(1000); // Wait for destroys to propagate |
| |
| vm2.invoke(new CacheSerializableRunnable("Verify destroy propagate") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| assertNull(region); |
| // todo close the bridge writer |
| // Not necessary since locally destroying the region takes care of this. |
| } |
| }); |
| |
| vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| |
| } |
| |
| /** |
| * Tests interest list registration with callback arg with DataPolicy.EMPTY and InterestPolicy.ALL |
| */ |
| @Test |
| public void test026DPEmptyInterestListRegistrationWithCallbackArg() throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = Host.getHost(0).getVM(2); |
| |
| // Create cache server |
| vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| CacheLoader cl = new CacheLoader() { |
| @Override |
| public Object load(LoaderHelper helper) { |
| return helper.getKey(); |
| } |
| |
| @Override |
| public void close() { |
| |
| } |
| }; |
| AttributesFactory factory = getBridgeServerRegionAttributes(cl, null); |
| createRegion(name, factory.create()); |
| Wait.pause(1000); |
| try { |
| startBridgeServer(0); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }); |
| |
| // Create cache server clients |
| final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| SerializableRunnable create = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| // create bridge writer |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null); |
| factory.addCacheListener(new ControlListener()); |
| factory.setDataPolicy(DataPolicy.EMPTY); |
| factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL)); |
| createRegion(name, factory.create()); |
| } |
| }; |
| SerializableRunnable createPublisher = |
| new CacheSerializableRunnable("Create publisher region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| // create bridge writer |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, |
| null); |
| factory.addCacheListener(new ControlListener()); |
| factory.setDataPolicy(DataPolicy.EMPTY); // make sure empty works with client publishers |
| createRegion(name, factory.create()); |
| } |
| }; |
| |
| vm1.invoke(create); |
| vm2.invoke(createPublisher); |
| |
| // VM1 Register interest |
| vm1.invoke(new CacheSerializableRunnable("Create Entries and Register Interest") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| try { |
| // This call will cause no value to be put into the region |
| region.registerInterest("key-1", InterestResultPolicy.NONE); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While registering interest: ", ex); |
| } |
| } |
| }); |
| |
| // VM2 Put entry (this will cause a create event in both VM1 and VM2) |
| vm2.invoke(new CacheSerializableRunnable("Put Value") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| region.create("key-1", "key-1-create", "key-1-create"); |
| } |
| }); |
| |
| // VM2 Put entry (this will cause an update event in both VM1 and VM2) |
| vm2.invoke(new CacheSerializableRunnable("Put Value") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| region.put("key-1", "key-1-update", "key-1-update"); |
| } |
| }); |
| |
| // VM2 Destroy entry (this will cause a destroy event) |
| vm2.invoke(new CacheSerializableRunnable("Destroy Entry") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| region.destroy("key-1", "key-1-destroy"); |
| } |
| }); |
| |
| final SerializableRunnable assertEvents = new CacheSerializableRunnable("Verify events") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| ControlListener listener = (ControlListener) region.getAttributes().getCacheListeners()[0]; |
| int eventCount = 3; |
| listener.waitWhileNotEnoughEvents(60000, eventCount); |
| assertEquals(eventCount, listener.events.size()); |
| |
| { |
| EventWrapper ew = (EventWrapper) listener.events.get(0); |
| assertEquals(TYPE_CREATE, ew.type); |
| Object key = "key-1"; |
| assertEquals(key, ew.event.getKey()); |
| assertEquals(null, ew.event.getOldValue()); |
| assertEquals(false, ew.event.isOldValueAvailable()); // failure |
| assertEquals("key-1-create", ew.event.getNewValue()); |
| assertEquals(Operation.CREATE, ew.event.getOperation()); |
| assertEquals("key-1-create", ew.event.getCallbackArgument()); |
| assertEquals(true, ew.event.isOriginRemote()); |
| |
| ew = (EventWrapper) listener.events.get(1); |
| assertEquals(TYPE_UPDATE, ew.type); |
| assertEquals(key, ew.event.getKey()); |
| assertEquals(null, ew.event.getOldValue()); |
| assertEquals(false, ew.event.isOldValueAvailable()); |
| assertEquals("key-1-update", ew.event.getNewValue()); |
| assertEquals(Operation.UPDATE, ew.event.getOperation()); |
| assertEquals("key-1-update", ew.event.getCallbackArgument()); |
| assertEquals(true, ew.event.isOriginRemote()); |
| |
| ew = (EventWrapper) listener.events.get(2); |
| assertEquals(TYPE_DESTROY, ew.type); |
| assertEquals("key-1-destroy", ew.arg); |
| assertEquals(key, ew.event.getKey()); |
| assertEquals(null, ew.event.getOldValue()); |
| assertEquals(false, ew.event.isOldValueAvailable()); |
| assertEquals(null, ew.event.getNewValue()); |
| assertEquals(Operation.DESTROY, ew.event.getOperation()); |
| assertEquals("key-1-destroy", ew.event.getCallbackArgument()); |
| assertEquals(true, ew.event.isOriginRemote()); |
| } |
| } |
| }; |
| vm1.invoke(assertEvents); |
| |
| // Close cache server clients |
| SerializableRunnable close = new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| region.localDestroyRegion(); |
| } |
| }; |
| |
| vm1.invoke(close); |
| vm2.invoke(close); |
| |
| // Stop cache server |
| vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| } |
| |
| /** |
| * Tests interest list registration with callback arg with DataPolicy.EMPTY and |
| * InterestPolicy.CACHE_CONTENT |
| */ |
| @Test |
| public void test027DPEmptyCCInterestListRegistrationWithCallbackArg() throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = Host.getHost(0).getVM(2); |
| |
| // Create cache server |
| vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| CacheLoader cl = new CacheLoader() { |
| @Override |
| public Object load(LoaderHelper helper) { |
| return helper.getKey(); |
| } |
| |
| @Override |
| public void close() { |
| |
| } |
| }; |
| AttributesFactory factory = getBridgeServerRegionAttributes(cl, null); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }); |
| |
| // Create cache server clients |
| final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| SerializableRunnable create = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null); |
| factory.setCacheListener(new ControlListener()); |
| factory.setDataPolicy(DataPolicy.EMPTY); |
| factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.CACHE_CONTENT)); |
| createRegion(name, factory.create()); |
| } |
| }; |
| SerializableRunnable createPublisher = |
| new CacheSerializableRunnable("Create publisher region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| // create bridge writer |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, |
| null); |
| factory.setCacheListener(new ControlListener()); |
| factory.setDataPolicy(DataPolicy.EMPTY); // make sure empty works with client publishers |
| createRegion(name, factory.create()); |
| } |
| }; |
| |
| vm1.invoke(create); |
| vm2.invoke(createPublisher); |
| |
| // VM1 Register interest |
| vm1.invoke(new CacheSerializableRunnable("Create Entries and Register Interest") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| try { |
| // This call will cause no value to be put into the region |
| region.registerInterest("key-1", InterestResultPolicy.NONE); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While registering interest: ", ex); |
| } |
| } |
| }); |
| |
| // VM2 Put entry (this will cause a create event in both VM1 and VM2) |
| vm2.invoke(new CacheSerializableRunnable("Put Value") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| region.create("key-1", "key-1-create", "key-1-create"); |
| } |
| }); |
| |
| // VM2 Put entry (this will cause an update event in both VM1 and VM2) |
| vm2.invoke(new CacheSerializableRunnable("Put Value") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| region.put("key-1", "key-1-update", "key-1-update"); |
| } |
| }); |
| |
| // VM2 Destroy entry (this will cause a destroy event) |
| vm2.invoke(new CacheSerializableRunnable("Destroy Entry") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| region.destroy("key-1", "key-1-destroy"); |
| } |
| }); |
| |
| final SerializableRunnable assertEvents = new CacheSerializableRunnable("Verify events") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| ControlListener listener = (ControlListener) region.getAttributes().getCacheListeners()[0]; |
| Wait.pause(1000); // we should not get any events but give some time for the server to send |
| // them |
| assertEquals(0, listener.events.size()); |
| } |
| }; |
| vm1.invoke(assertEvents); |
| |
| // Close cache server clients |
| SerializableRunnable close = new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| region.localDestroyRegion(); |
| } |
| }; |
| |
| vm1.invoke(close); |
| vm2.invoke(close); |
| |
| // Stop cache server |
| vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| } |
| |
| /** |
| * Test dynamic region creation instantiated from a bridge client causing regions to be created on |
| * two different cache servers. |
| * |
| * Also tests the reverse situation, a dynamic region is created on the cache server expecting |
| * the same region to be created on the client. |
| * |
| * Note: This test re-creates Distributed Systems for its own purposes and uses a Loner |
| * distributed systems to isolate the Bridge Client. |
| * |
| */ |
| @Test |
| public void test028DynamicRegionCreation() throws Exception { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| final VM client1 = host.getVM(0); |
| // VM client2 = host.getVM(1); |
| final VM srv1 = host.getVM(2); |
| final VM srv2 = host.getVM(3); |
| |
| final String k1 = name + "-key1"; |
| final String v1 = name + "-val1"; |
| final String k2 = name + "-key2"; |
| final String v2 = name + "-val2"; |
| final String k3 = name + "-key3"; |
| final String v3 = name + "-val3"; |
| |
| client1.invoke(() -> disconnectFromDS()); |
| srv1.invoke(() -> disconnectFromDS()); |
| srv2.invoke(() -> disconnectFromDS()); |
| try { |
| // setup servers |
| CacheSerializableRunnable ccs = new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| createDynamicRegionCache(name, (String) null); // Creates a new DS and Cache |
| assertTrue(DynamicRegionFactory.get().isOpen()); |
| try { |
| startBridgeServer(0); |
| } catch (IOException ugh) { |
| fail("cache server startup failed"); |
| } |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setDataPolicy(DataPolicy.REPLICATE); |
| factory.setConcurrencyChecksEnabled(false); |
| Region region = createRootRegion(name, factory.create()); |
| region.put(k1, v1); |
| Assert.assertTrue(region.get(k1).equals(v1)); |
| } |
| }; |
| srv1.invoke(ccs); |
| srv2.invoke(ccs); |
| |
| final String srv1Host = NetworkUtils.getServerHostName(srv1.getHost()); |
| final int srv1Port = srv1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| |
| final int srv2Port = srv2.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| // final String srv2Host = getServerHostName(srv2.getHost()); |
| |
| // setup clients, do basic tests to make sure pool with notifier work as advertised |
| client1.invoke(new CacheSerializableRunnable("Create Cache Client") { |
| @Override |
| public void run2() throws CacheException { |
| createLonerDS(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setConcurrencyChecksEnabled(false); |
| Pool cp = ClientServerTestCase.configureConnectionPool(factory, srv1Host, srv1Port, |
| srv2Port, true, -1, -1, null); |
| { |
| final PoolImpl pool = (PoolImpl) cp; |
| WaitCriterion ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| if (pool.getPrimary() == null) { |
| return false; |
| } |
| if (pool.getRedundants().size() < 1) { |
| return false; |
| } |
| return true; |
| } |
| |
| @Override |
| public String description() { |
| return null; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(ev); |
| assertNotNull(pool.getPrimary()); |
| assertTrue("backups=" + pool.getRedundants() + " expected=" + 1, |
| pool.getRedundants().size() >= 1); |
| } |
| |
| createDynamicRegionCache(name, "testPool"); |
| |
| assertTrue(DynamicRegionFactory.get().isOpen()); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| factory.setCacheListener(new CertifiableTestCacheListener( |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter())); |
| Region region = createRootRegion(name, factory.create()); |
| |
| |
| assertNull(region.getEntry(k1)); |
| region.registerInterestRegex(".*", InterestResultPolicy.KEYS_VALUES); // this should match |
| // the key |
| assertEquals(v1, region.getEntry(k1).getValue()); // Update via registered interest |
| |
| assertNull(region.getEntry(k2)); |
| region.put(k2, v2); // use the Pool |
| assertEquals(v2, region.getEntry(k2).getValue()); // Ensure that the notifier didn't un-do |
| // the put, bug 35355 |
| |
| region.put(k3, v3); // setup a key for invalidation from a notifier |
| } |
| }); |
| |
| srv1.invoke(new CacheSerializableRunnable("Validate Server1 update") { |
| @Override |
| public void run2() throws CacheException { |
| CacheClientNotifier ccn = getInstance(); |
| final CacheClientNotifierStats ccnStats = ccn.getStats(); |
| final int eventCount = ccnStats.getEvents(); |
| Region r = getRootRegion(name); |
| assertNotNull(r); |
| assertEquals(v2, r.getEntry(k2).getValue()); // Validate the Pool worked, getEntry works |
| // because of the mirror |
| assertEquals(v3, r.getEntry(k3).getValue()); // Make sure we have the other entry to use |
| // for notification |
| r.put(k3, v1); // Change k3, sending some data to the client notifier |
| |
| // Wait for the update to propagate to the clients |
| final int maxTime = 20000; |
| // long start = System.currentTimeMillis(); |
| WaitCriterion ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return ccnStats.getEvents() > eventCount; |
| } |
| |
| @Override |
| public String description() { |
| return "waiting for ccnStat"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(ev); |
| // Set prox = ccn.getClientProxies(); |
| // assertIndexDetailsEquals(1, prox.size()); |
| // for (Iterator cpi = prox.iterator(); cpi.hasNext(); ) { |
| // CacheClientProxy ccp = (CacheClientProxy) cpi.next(); |
| // start = System.currentTimeMillis(); |
| // while (ccp.getMessagesProcessed() < 1) { |
| // assertTrue("Waited more than " + maxTime + "ms for client notification", |
| // (System.currentTimeMillis() - start) < maxTime); |
| // try { |
| // Thread.sleep(100); |
| // } catch (InterruptedException ine) { fail("Interrupted while waiting for client |
| // notifier to complete"); } |
| // } |
| // } |
| } |
| }); |
| srv2.invoke(new CacheSerializableRunnable("Validate Server2 update") { |
| @Override |
| public void run2() throws CacheException { |
| Region r = getRootRegion(name); |
| assertNotNull(r); |
| assertEquals(v2, r.getEntry(k2).getValue()); // Validate the Pool worked, getEntry works |
| // because of the mirror |
| assertEquals(v1, r.getEntry(k3).getValue()); // From peer update |
| } |
| }); |
| client1.invoke(new CacheSerializableRunnable("Validate Client notification") { |
| @Override |
| public void run2() throws CacheException { |
| Region r = getRootRegion(name); |
| assertNotNull(r); |
| CertifiableTestCacheListener ctl = |
| (CertifiableTestCacheListener) r.getAttributes().getCacheListener(); |
| ctl.waitForUpdated(k3); |
| assertEquals(v1, r.getEntry(k3).getValue()); // Ensure that the notifier updated the entry |
| } |
| }); |
| // Ok, now we are ready to do some dynamic region action! |
| final String v1Dynamic = v1 + "dynamic"; |
| final String dynFromClientName = name + "-dynamic-client"; |
| final String dynFromServerName = name + "-dynamic-server"; |
| client1.invoke(new CacheSerializableRunnable("Client dynamic region creation") { |
| @Override |
| public void run2() throws CacheException { |
| assertTrue(DynamicRegionFactory.get().isOpen()); |
| Region r = getRootRegion(name); |
| assertNotNull(r); |
| Region dr = DynamicRegionFactory.get().createDynamicRegion(name, dynFromClientName); |
| assertNull(dr.get(k1)); // This should be enough to validate the creation on the server |
| dr.put(k1, v1Dynamic); |
| assertEquals(v1Dynamic, dr.getEntry(k1).getValue()); |
| } |
| }); |
| // Assert the servers have the dynamic region and the new value |
| CacheSerializableRunnable valDR = |
| new CacheSerializableRunnable("Validate dynamic region creation on server") { |
| @Override |
| public void run2() throws CacheException { |
| Region r = getRootRegion(name); |
| assertNotNull(r); |
| long end = System.currentTimeMillis() + 10000; |
| Region dr = null; |
| for (;;) { |
| try { |
| dr = r.getSubregion(dynFromClientName); |
| assertNotNull(dr); |
| assertNotNull(getCache().getRegion(name + Region.SEPARATOR + dynFromClientName)); |
| break; |
| } catch (AssertionError e) { |
| if (System.currentTimeMillis() > end) { |
| throw e; |
| } |
| } |
| } |
| |
| assertEquals(v1Dynamic, dr.getEntry(k1).getValue()); |
| } |
| }; |
| srv1.invoke(valDR); |
| srv2.invoke(valDR); |
| // now delete the dynamic region and see if it goes away on servers |
| client1.invoke(new CacheSerializableRunnable("Client dynamic region destruction") { |
| @Override |
| public void run2() throws CacheException { |
| assertTrue(DynamicRegionFactory.get().isActive()); |
| Region r = getRootRegion(name); |
| assertNotNull(r); |
| String drName = r.getFullPath() + Region.SEPARATOR + dynFromClientName; |
| |
| assertNotNull(getCache().getRegion(drName)); |
| DynamicRegionFactory.get().destroyDynamicRegion(drName); |
| assertNull(getCache().getRegion(drName)); |
| } |
| }); |
| // Assert the servers no longer have the dynamic region |
| CacheSerializableRunnable valNoDR = |
| new CacheSerializableRunnable("Validate dynamic region destruction on server") { |
| @Override |
| public void run2() throws CacheException { |
| Region r = getRootRegion(name); |
| assertNotNull(r); |
| String drName = r.getFullPath() + Region.SEPARATOR + dynFromClientName; |
| assertNull(getCache().getRegion(drName)); |
| try { |
| DynamicRegionFactory.get().destroyDynamicRegion(drName); |
| fail("expected RegionDestroyedException"); |
| } catch (RegionDestroyedException expected) { |
| } |
| } |
| }; |
| srv1.invoke(valNoDR); |
| srv2.invoke(valNoDR); |
| // Now try the reverse, create a dynamic region on the server and see if the client |
| // has it |
| srv2.invoke(new CacheSerializableRunnable("Server dynamic region creation") { |
| @Override |
| public void run2() throws CacheException { |
| Region r = getRootRegion(name); |
| assertNotNull(r); |
| Region dr = DynamicRegionFactory.get().createDynamicRegion(name, dynFromServerName); |
| assertNull(dr.get(k1)); |
| dr.put(k1, v1Dynamic); |
| assertEquals(v1Dynamic, dr.getEntry(k1).getValue()); |
| } |
| }); |
| // Assert the servers have the dynamic region and the new value |
| srv1.invoke(new CacheSerializableRunnable( |
| "Validate dynamic region creation propagation to other server") { |
| @Override |
| public void run2() throws CacheException { |
| Region r = getRootRegion(name); |
| assertNotNull(r); |
| Region dr = waitForSubRegion(r, dynFromServerName); |
| assertNotNull(dr); |
| assertNotNull(getCache().getRegion(name + Region.SEPARATOR + dynFromServerName)); |
| waitForEntry(dr, k1); |
| assertNotNull(dr.getEntry(k1)); |
| assertEquals(v1Dynamic, dr.getEntry(k1).getValue()); |
| } |
| }); |
| // Assert the clients have the dynamic region and the new value |
| client1.invoke(new CacheSerializableRunnable("Validate dynamic region creation on client") { |
| @Override |
| public void run2() throws CacheException { |
| Region r = getRootRegion(name); |
| assertNotNull(r); |
| long end = System.currentTimeMillis() + 10000; |
| Region dr = null; |
| for (;;) { |
| try { |
| dr = r.getSubregion(dynFromServerName); |
| assertNotNull(dr); |
| assertNotNull(getCache().getRegion(name + Region.SEPARATOR + dynFromServerName)); |
| break; |
| } catch (AssertionError e) { |
| if (System.currentTimeMillis() > end) { |
| throw e; |
| } else { |
| Wait.pause(1000); |
| } |
| } |
| } |
| waitForEntry(dr, k1); |
| assertNotNull(dr.getEntry(k1)); |
| assertEquals(v1Dynamic, dr.getEntry(k1).getValue()); |
| } |
| }); |
| // now delete the dynamic region on a server and see if it goes away on client |
| srv2.invoke(new CacheSerializableRunnable("Server dynamic region destruction") { |
| @Override |
| public void run2() throws CacheException { |
| assertTrue(DynamicRegionFactory.get().isActive()); |
| Region r = getRootRegion(name); |
| assertNotNull(r); |
| String drName = r.getFullPath() + Region.SEPARATOR + dynFromServerName; |
| |
| assertNotNull(getCache().getRegion(drName)); |
| DynamicRegionFactory.get().destroyDynamicRegion(drName); |
| assertNull(getCache().getRegion(drName)); |
| } |
| }); |
| srv1.invoke( |
| new CacheSerializableRunnable("Validate dynamic region destruction on other server") { |
| @Override |
| public void run2() throws CacheException { |
| Region r = getRootRegion(name); |
| assertNotNull(r); |
| String drName = r.getFullPath() + Region.SEPARATOR + dynFromServerName; |
| { |
| int retry = 100; |
| while (retry-- > 0 && getCache().getRegion(drName) != null) { |
| try { |
| Thread.sleep(100); |
| } catch (InterruptedException ignore) { |
| fail("interrupted"); |
| } |
| } |
| } |
| assertNull(getCache().getRegion(drName)); |
| } |
| }); |
| // Assert the clients no longer have the dynamic region |
| client1 |
| .invoke(new CacheSerializableRunnable("Validate dynamic region destruction on client") { |
| @Override |
| public void run2() throws CacheException { |
| Region r = getRootRegion(name); |
| assertNotNull(r); |
| String drName = r.getFullPath() + Region.SEPARATOR + dynFromServerName; |
| { |
| int retry = 100; |
| while (retry-- > 0 && getCache().getRegion(drName) != null) { |
| try { |
| Thread.sleep(100); |
| } catch (InterruptedException ignore) { |
| fail("interrupted"); |
| } |
| } |
| } |
| assertNull(getCache().getRegion(drName)); |
| // sleep to make sure that the dynamic region entry from the internal |
| // region,dynamicRegionList in DynamicRegionFactory // ? |
| try { |
| Thread.sleep(10000); |
| } catch (InterruptedException ignore) { |
| fail("interrupted"); |
| } |
| try { |
| DynamicRegionFactory.get().destroyDynamicRegion(drName); |
| fail("expected RegionDestroyedException"); |
| } catch (RegionDestroyedException expected) { |
| } |
| } |
| }); |
| } finally { |
| client1.invoke(() -> disconnectFromDS()); // clean-up loner |
| srv1.invoke(() -> disconnectFromDS()); |
| srv2.invoke(() -> disconnectFromDS()); |
| } |
| } |
| |
| |
| /** |
| * Test for bug 36279 |
| */ |
| @Test |
| public void test029EmptyByteArray() throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| |
| final Object createCallbackArg = "CREATE CALLBACK ARG"; |
| |
| vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| AttributesFactory factory = getBridgeServerRegionAttributes(null, null); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }); |
| final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| SerializableRunnable create = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null); |
| createRegion(name, factory.create()); |
| } |
| }; |
| |
| vm1.invoke(create); |
| vm1.invoke(new CacheSerializableRunnable("Create empty byte array") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 1; i++) { |
| region.create(new Integer(i), new byte[0], createCallbackArg); |
| } |
| } |
| }); |
| |
| vm1.invoke(new CacheSerializableRunnable("Verify values on client") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 1; i++) { |
| Region.Entry entry = region.getEntry(new Integer(i)); |
| assertNotNull(entry); |
| byte[] value = (byte[]) entry.getValue(); |
| assertNotNull(value); |
| assertEquals(0, value.length); |
| } |
| } |
| }); |
| vm0.invoke(new CacheSerializableRunnable("Verify values on server") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 1; i++) { |
| Region.Entry entry = region.getEntry(new Integer(i)); |
| assertNotNull(entry); |
| byte[] value = (byte[]) entry.getValue(); |
| assertNotNull(value); |
| assertEquals(0, value.length); |
| } |
| } |
| }); |
| |
| SerializableRunnable close = new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| region.localDestroyRegion(); |
| } |
| }; |
| |
| vm1.invoke(close); |
| |
| vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| } |
| |
| /** |
| * Tests interest list registration with callback arg |
| */ |
| @Test |
| public void test030InterestListRegistrationWithCallbackArg() throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = Host.getHost(0).getVM(2); |
| |
| // Create cache server |
| vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| CacheLoader cl = new CacheLoader() { |
| @Override |
| public Object load(LoaderHelper helper) { |
| return helper.getKey(); |
| } |
| |
| @Override |
| public void close() { |
| |
| } |
| }; |
| AttributesFactory factory = getBridgeServerRegionAttributes(cl, null); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }); |
| |
| // Create cache server clients |
| final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| SerializableRunnable create = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| // create bridge writer |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null); |
| factory.setCacheListener(new ControlListener()); |
| createRegion(name, factory.create()); |
| } |
| }; |
| |
| vm1.invoke(create); |
| vm2.invoke(create); |
| |
| // VM1 Register interest |
| vm1.invoke(new CacheSerializableRunnable("Create Entries and Register Interest") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| try { |
| // This call will cause no value to be put into the region |
| region.registerInterest("key-1", InterestResultPolicy.NONE); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While registering interest: ", ex); |
| } |
| } |
| }); |
| |
| // VM2 Put entry (this will cause a create event in both VM1 and VM2) |
| vm2.invoke(new CacheSerializableRunnable("Put Value") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| region.create("key-1", "key-1-create", "key-1-create"); |
| } |
| }); |
| |
| // VM2 Put entry (this will cause an update event in both VM1 and VM2) |
| vm2.invoke(new CacheSerializableRunnable("Put Value") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| region.put("key-1", "key-1-update", "key-1-update"); |
| } |
| }); |
| |
| // VM2 Destroy entry (this will cause a destroy event) |
| vm2.invoke(new CacheSerializableRunnable("Destroy Entry") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| region.destroy("key-1", "key-1-destroy"); |
| } |
| }); |
| |
| final SerializableRunnable assertEvents = new CacheSerializableRunnable("Verify events") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| ControlListener listener = (ControlListener) region.getAttributes().getCacheListeners()[0]; |
| int eventCount = 3; |
| listener.waitWhileNotEnoughEvents(60000, eventCount); |
| assertEquals(eventCount, listener.events.size()); |
| |
| { |
| EventWrapper ew = (EventWrapper) listener.events.get(0); |
| assertEquals(ew.type, TYPE_CREATE); |
| Object key = "key-1"; |
| assertEquals(key, ew.event.getKey()); |
| assertEquals(null, ew.event.getOldValue()); |
| assertEquals("key-1-create", ew.event.getNewValue()); |
| assertEquals(Operation.CREATE, ew.event.getOperation()); |
| assertEquals("key-1-create", ew.event.getCallbackArgument()); |
| assertEquals(true, ew.event.isOriginRemote()); |
| |
| ew = (EventWrapper) listener.events.get(1); |
| assertEquals(ew.type, TYPE_UPDATE); |
| assertEquals(key, ew.event.getKey()); |
| assertEquals("key-1-create", ew.event.getOldValue()); |
| assertEquals("key-1-update", ew.event.getNewValue()); |
| assertEquals(Operation.UPDATE, ew.event.getOperation()); |
| assertEquals("key-1-update", ew.event.getCallbackArgument()); |
| assertEquals(true, ew.event.isOriginRemote()); |
| |
| ew = (EventWrapper) listener.events.get(2); |
| assertEquals(ew.type, TYPE_DESTROY); |
| assertEquals("key-1-destroy", ew.arg); |
| assertEquals(key, ew.event.getKey()); |
| assertEquals("key-1-update", ew.event.getOldValue()); |
| assertEquals(null, ew.event.getNewValue()); |
| assertEquals(Operation.DESTROY, ew.event.getOperation()); |
| assertEquals("key-1-destroy", ew.event.getCallbackArgument()); |
| assertEquals(true, ew.event.isOriginRemote()); |
| } |
| } |
| }; |
| vm1.invoke(assertEvents); |
| |
| // Close cache server clients |
| SerializableRunnable close = new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| region.localDestroyRegion(); |
| } |
| }; |
| |
| vm1.invoke(close); |
| vm2.invoke(close); |
| |
| // Stop cache server |
| vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| } |
| |
| /** |
| * Tests the keySetOnServer operation of the {@link Pool} |
| * |
| * @since GemFire 5.0.2 |
| */ |
| @Test |
| public void test031KeySetOnServer() throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| |
| vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setConcurrencyChecksEnabled(false); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }); |
| final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| SerializableRunnable create = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null); |
| createRegion(name, factory.create()); |
| } |
| }; |
| vm1.invoke(create); |
| vm2.invoke(create); |
| |
| vm2.invoke(new CacheSerializableRunnable("Get keys on server") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| Set keySet = region.keySetOnServer(); |
| assertNotNull(keySet); |
| assertEquals(0, keySet.size()); |
| } |
| }); |
| |
| vm1.invoke(new CacheSerializableRunnable("Put values") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| region.put(new Integer(i), new Integer(i)); |
| } |
| } |
| }); |
| |
| vm2.invoke(new CacheSerializableRunnable("Get keys on server") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| Set keySet = region.keySetOnServer(); |
| assertNotNull(keySet); |
| assertEquals(10, keySet.size()); |
| } |
| }); |
| |
| SerializableRunnable close = new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| region.localDestroyRegion(); |
| } |
| }; |
| vm1.invoke(close); |
| vm2.invoke(close); |
| |
| vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| } |
| |
| |
| // this test doesn't do anything so I commented it out |
| // /** |
| // * Tests that new connections update client notification connections. |
| // */ |
| // public void test032NewConnections() throws Exception { |
| // final String name = this.getName(); |
| // final Host host = Host.getHost(0); |
| // VM vm0 = host.getVM(0); |
| // VM vm1 = host.getVM(1); |
| // VM vm2 = host.getVM(2); |
| |
| // // Cache server serves up the region |
| // vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| // public void run2() throws CacheException { |
| // AttributesFactory factory = getBridgeServerRegionAttributes(null,null); |
| // Region region = createRegion(name, factory.create()); |
| // pause(1000); |
| // try { |
| // startBridgeServer(0); |
| |
| // } catch (Exception ex) { |
| // fail("While starting CacheServer", ex); |
| // } |
| |
| // } |
| // }); |
| // final int port = |
| // vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| // final String host0 = getServerHostName(vm0.getHost()); |
| |
| // SerializableRunnable create = |
| // new CacheSerializableRunnable("Create region") { |
| // public void run2() throws CacheException { |
| // getCache(); |
| // AttributesFactory factory = new AttributesFactory(); |
| // factory.setScope(Scope.LOCAL); |
| |
| // ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null); |
| |
| // createRegion(name, factory.create()); |
| // } |
| // }; |
| |
| // vm1.invoke(create); |
| // vm2.invoke(create); |
| |
| // vm1.invoke(new CacheSerializableRunnable("Create new connection") { |
| // public void run2() throws CacheException { |
| // Region region = getRootRegion().getSubregion(name); |
| // BridgeClient writer = getPoolClient(region); |
| // Endpoint[] endpoints = (Endpoint[])writer.getEndpoints(); |
| // for (int i=0; i<endpoints.length; i++) endpoints[i].addNewConnection(); |
| // } |
| // }); |
| |
| // SerializableRunnable close = |
| // new CacheSerializableRunnable("Close Pool") { |
| // public void run2() throws CacheException { |
| // Region region = getRootRegion().getSubregion(name); |
| // region.localDestroyRegion(); |
| // } |
| // }; |
| |
| // vm1.invoke(close); |
| // vm2.invoke(close); |
| |
| // vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| // public void run() { |
| // stopBridgeServer(getCache()); |
| // } |
| // }); |
| // } |
| |
| /** |
| * Tests that creating, putting and getting a non-serializable key or value throws the correct |
| * (NotSerializableException) exception. |
| */ |
| @Test |
| public void test033NotSerializableException() throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| // VM vm2 = host.getVM(2); |
| |
| vm0.invoke(new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| AttributesFactory factory = getBridgeServerRegionAttributes(null, null); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }); |
| final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final String host0 = NetworkUtils.getServerHostName(vm0.getHost()); |
| |
| SerializableRunnable create = new CacheSerializableRunnable("Create region") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| getCache(); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| |
| ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null); |
| createRegion(name, factory.create()); |
| } |
| }; |
| vm1.invoke(create); |
| |
| vm1.invoke(new CacheSerializableRunnable("Attempt to create a non-serializable value") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| try { |
| region.create(new Integer(1), new ConnectionPoolTestNonSerializable()); |
| fail("Should not have been able to create a ConnectionPoolTestNonSerializable"); |
| } catch (Exception e) { |
| if (!(e.getCause() instanceof java.io.NotSerializableException)) |
| fail("Unexpected exception while creating a non-serializable value " + e); |
| } |
| } |
| }); |
| |
| vm1.invoke(new CacheSerializableRunnable("Attempt to put a non-serializable value") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| try { |
| region.put(new Integer(1), new ConnectionPoolTestNonSerializable()); |
| fail("Should not have been able to put a ConnectionPoolTestNonSerializable"); |
| } catch (Exception e) { |
| if (!(e.getCause() instanceof java.io.NotSerializableException)) |
| fail("Unexpected exception while putting a non-serializable value " + e); |
| } |
| } |
| }); |
| |
| vm1.invoke(new CacheSerializableRunnable("Attempt to get a non-serializable key") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| try { |
| region.get(new ConnectionPoolTestNonSerializable()); |
| fail("Should not have been able to get a ConnectionPoolTestNonSerializable"); |
| } catch (Exception e) { |
| if (!(e.getCause() instanceof java.io.NotSerializableException)) |
| fail("Unexpected exception while getting a non-serializable key " + e); |
| } |
| } |
| }); |
| |
| SerializableRunnable close = new CacheSerializableRunnable("Close Pool") { |
| @Override |
| public void run2() throws CacheException { |
| Region region = getRootRegion().getSubregion(name); |
| region.localDestroyRegion(); |
| } |
| }; |
| |
| vm1.invoke(close); |
| |
| vm0.invoke(new SerializableRunnable("Stop CacheServer") { |
| @Override |
| public void run() { |
| stopBridgeServer(getCache()); |
| } |
| }); |
| } |
| |
| protected class ConnectionPoolTestNonSerializable { |
| protected ConnectionPoolTestNonSerializable() {} |
| } |
| |
| /** |
| * Tests 'notify-all' client updates. This test verifies that: - only invalidates are sent as part |
| * of the 'notify-all' mode of client updates - originators of updates are not sent invalidates - |
| * non-originators of updates are sent invalidates - multiple invalidates are not sent for the |
| * same update |
| */ |
| @Test |
| public void test034NotifyAllUpdates() throws CacheException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| VM vm3 = host.getVM(3); |
| |
| disconnectAllFromDS(); |
| |
| // Create the cache servers with distributed, mirrored region |
| SerializableRunnable createServer = new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| CacheLoader cl = new CacheLoader() { |
| @Override |
| public Object load(LoaderHelper helper) { |
| return helper.getKey(); |
| } |
| |
| @Override |
| public void close() { |
| |
| } |
| }; |
| AttributesFactory factory = getBridgeServerMirroredAckRegionAttributes(cl, null); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| |
| } |
| }; |
| getSystem().getLogWriter().info("before create server"); |
| vm0.invoke(createServer); |
| vm1.invoke(createServer); |
| |
| // Create cache server clients |
| final int numberOfKeys = 10; |
| final String host0 = NetworkUtils.getServerHostName(host); |
| final int vm0Port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| final int vm1Port = vm1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| SerializableRunnable createClient = |
| new CacheSerializableRunnable("Create Cache Server Client") { |
| @Override |
| public void run2() throws CacheException { |
| // reset all static listener variables in case this is being rerun in a subclass |
| numberOfAfterInvalidates = 0; |
| numberOfAfterCreates = 0; |
| numberOfAfterUpdates = 0; |
| getLonerSystem(); |
| // create the region |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| // create bridge writer |
| ClientServerTestCase.configureConnectionPool(factory, host0, vm0Port, vm1Port, true, -1, |
| -1, null); |
| Region rgn = createRegion(name, factory.create()); |
| } |
| }; |
| getSystem().getLogWriter().info("before create client"); |
| vm2.invoke(createClient); |
| vm3.invoke(createClient); |
| |
| // Initialize each client with entries (so that afterInvalidate is called) |
| SerializableRunnable initializeClient = new CacheSerializableRunnable("Initialize Client") { |
| @Override |
| public void run2() throws CacheException { |
| numberOfAfterInvalidates = 0; |
| numberOfAfterCreates = 0; |
| numberOfAfterUpdates = 0; |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| for (int i = 0; i < numberOfKeys; i++) { |
| assertEquals("key-" + i, region.get("key-" + i)); |
| } |
| } |
| }; |
| getSystem().getLogWriter().info("before initialize client"); |
| vm2.invoke(initializeClient); |
| vm3.invoke(initializeClient); |
| |
| // Add a CacheListener to both vm2 and vm3 |
| vm2.invoke(new CacheSerializableRunnable("Add CacheListener 1") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| CacheListener listener = new CacheListenerAdapter() { |
| @Override |
| public void afterCreate(EntryEvent e) { |
| numberOfAfterCreates++; |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() |
| .info("vm2 numberOfAfterCreates: " + numberOfAfterCreates); |
| } |
| |
| @Override |
| public void afterUpdate(EntryEvent e) { |
| numberOfAfterUpdates++; |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() |
| .info("vm2 numberOfAfterUpdates: " + numberOfAfterUpdates); |
| } |
| |
| @Override |
| public void afterInvalidate(EntryEvent e) { |
| numberOfAfterInvalidates++; |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() |
| .info("vm2 numberOfAfterInvalidates: " + numberOfAfterInvalidates); |
| } |
| }; |
| region.getAttributesMutator().addCacheListener(listener); |
| region.registerInterestRegex(".*", false, false); |
| } |
| }); |
| |
| vm3.invoke(new CacheSerializableRunnable("Add CacheListener 2") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| CacheListener listener = new CacheListenerAdapter() { |
| @Override |
| public void afterCreate(EntryEvent e) { |
| numberOfAfterCreates++; |
| // getLogWriter().info("vm3 numberOfAfterCreates: " + numberOfAfterCreates); |
| } |
| |
| @Override |
| public void afterUpdate(EntryEvent e) { |
| numberOfAfterUpdates++; |
| // getLogWriter().info("vm3 numberOfAfterUpdates: " + numberOfAfterUpdates); |
| } |
| |
| @Override |
| public void afterInvalidate(EntryEvent e) { |
| numberOfAfterInvalidates++; |
| // getLogWriter().info("vm3 numberOfAfterInvalidates: " + numberOfAfterInvalidates); |
| } |
| }; |
| region.getAttributesMutator().addCacheListener(listener); |
| region.registerInterestRegex(".*", false, false); |
| } |
| }); |
| |
| Wait.pause(3000); |
| |
| getSystem().getLogWriter().info("before puts"); |
| // Use vm2 to put new values |
| // This should cause 10 afterUpdates to vm2 and 10 afterInvalidates to vm3 |
| vm2.invoke(new CacheSerializableRunnable("Put New Values") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| for (int i = 0; i < 10; i++) { |
| region.put("key-" + i, "key-" + i); |
| } |
| } |
| }); |
| getSystem().getLogWriter().info("after puts"); |
| |
| // Wait to make sure all the updates are received |
| Wait.pause(1000); |
| |
| long vm2AfterCreates = vm2.invoke(() -> ConnectionPoolDUnitTest.getNumberOfAfterCreates()); |
| long vm2AfterUpdates = vm2.invoke(() -> ConnectionPoolDUnitTest.getNumberOfAfterUpdates()); |
| long vm2AfterInvalidates = |
| vm2.invoke(() -> ConnectionPoolDUnitTest.getNumberOfAfterInvalidates()); |
| long vm3AfterCreates = vm3.invoke(() -> ConnectionPoolDUnitTest.getNumberOfAfterCreates()); |
| long vm3AfterUpdates = vm3.invoke(() -> ConnectionPoolDUnitTest.getNumberOfAfterUpdates()); |
| long vm3AfterInvalidates = |
| vm3.invoke(() -> ConnectionPoolDUnitTest.getNumberOfAfterInvalidates()); |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() |
| .info("vm2AfterCreates: " + vm2AfterCreates); |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() |
| .info("vm2AfterUpdates: " + vm2AfterUpdates); |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() |
| .info("vm2AfterInvalidates: " + vm2AfterInvalidates); |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() |
| .info("vm3AfterCreates: " + vm3AfterCreates); |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() |
| .info("vm3AfterUpdates: " + vm3AfterUpdates); |
| org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() |
| .info("vm3AfterInvalidates: " + vm3AfterInvalidates); |
| |
| assertTrue("VM2 should not have received any afterCreate messages", vm2AfterCreates == 0); |
| assertTrue("VM2 should not have received any afterInvalidate messages", |
| vm2AfterInvalidates == 0); |
| assertTrue("VM2 received " + vm2AfterUpdates + " afterUpdate messages. It should have received " |
| + numberOfKeys, vm2AfterUpdates == numberOfKeys); |
| |
| assertTrue("VM3 should not have received any afterCreate messages", vm3AfterCreates == 0); |
| assertTrue("VM3 should not have received any afterUpdate messages", vm3AfterUpdates == 0); |
| assertTrue( |
| "VM3 received " + vm3AfterInvalidates |
| + " afterInvalidate messages. It should have received " + numberOfKeys, |
| vm3AfterInvalidates == numberOfKeys); |
| } |
| |
| /** |
| * Test that the "notify by subscription" attribute is unique for each BridgeServer and Gateway |
| * |
| */ |
| /* |
| * public void test035NotifyBySubscriptionIsolation() throws Exception { final String name = |
| * this.getName(); final Host host = Host.getHost(0); final VM server = host.getVM(3); final VM |
| * client1 = host.getVM(1); final VM client2 = host.getVM(2); |
| * |
| * final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(3); final int bs1Port = |
| * ports[0]; final int bs2Port = ports[1]; final int gwPort = ports[2]; |
| * |
| * final String key1 = "key1-" + name; final String val1 = "val1-" + name; final String key2 = |
| * "key2-" + name; final String val2 = "val2-" + name; |
| * |
| * try { server.invoke(new CacheSerializableRunnable("Setup BridgeServers and Gateway") { public |
| * void run2() throws CacheException { Cache cache = getCache(); |
| * |
| * try { |
| * |
| * // Create a gateway (which sets notify-by-subscription to true) cache.setGatewayHub(name, |
| * gwPort).start(); |
| * |
| * // Start the server that does not have notify-by-subscription (server2) CacheServer bridge2 = |
| * cache.addCacheServer(); bridge2.setPort(bs2Port); bridge2.setNotifyBySubscription(false); |
| * String[] noNotifyGroup = {"noNotifyGroup"}; bridge2.setGroups(noNotifyGroup); bridge2.start(); |
| * assertFalse(bridge2.getNotifyBySubscription()); { BridgeServerImpl bsi = (BridgeServerImpl) |
| * bridge2; AcceptorImpl aci = bsi.getAcceptor(); |
| * |
| * //assertFalse(aci.getCacheClientNotifier().getNotifyBySubscription()); } |
| * |
| * // Start the server that DOES have notify-by-subscription (server1) CacheServer bridge1 = |
| * cache.addCacheServer(); bridge1.setPort(bs1Port); bridge1.setNotifyBySubscription(true); |
| * String[] notifyGroup = {"notifyGroup"}; bridge1.setGroups(notifyGroup); bridge1.start(); |
| * assertTrue(bridge1.getNotifyBySubscription()); { BridgeServerImpl bsi = (BridgeServerImpl) |
| * bridge1; AcceptorImpl aci = bsi.getAcceptor(); |
| * assertTrue(aci.getCacheClientNotifier().getNotifyBySubscription()); } |
| * |
| * } catch (IOException ioe) { fail("Setup of BridgeServer test " + name + " failed", ioe ); } |
| * |
| * Region r = createRootRegion(name, getRegionAttributes()); r.put(key1, val1); } }); |
| * |
| * client1.invoke(new |
| * CacheSerializableRunnable("Test client1 to server with true notify-by-subscription") { public |
| * void run2() throws CacheException { createLonerDS(); AttributesFactory factory = new |
| * AttributesFactory(); factory.setScope(Scope.LOCAL); |
| * ClientServerTestCase.configureConnectionPool(factory,getServerHostName(host),bs1Port,-1,true,-1 |
| * ,-1, "notifyGroup"); factory.setCacheListener(new |
| * CertifiableTestCacheListener(getLogWriter())); Region r = createRootRegion(name, |
| * factory.create()); assertNull(r.getEntry(key1)); r.registerInterest(key1); |
| * assertNotNull(r.getEntry(key1)); assertIndexDetailsEquals(val1, r.getEntry(key1).getValue()); |
| * r.registerInterest(key2); assertNull(r.getEntry(key2)); } }); |
| * |
| * client2.invoke(new |
| * CacheSerializableRunnable("Test client2 to server with false notify-by-subscription") { public |
| * void run2() throws CacheException { createLonerDS(); AttributesFactory factory = new |
| * AttributesFactory(); |
| * ClientServerTestCase.configureConnectionPool(factory,getServerHostName(host),bs2Port,-1,true,-1 |
| * ,-1, "noNotifyGroup"); |
| * |
| * factory.setScope(Scope.LOCAL); factory.setCacheListener(new |
| * CertifiableTestCacheListener(getLogWriter())); Region r = createRootRegion(name, |
| * factory.create()); assertNull(r.getEntry(key1)); assertIndexDetailsEquals(val1, r.get(key1)); |
| * assertNull(r.getEntry(key2)); r.registerInterest(key2); assertNull(r.getEntry(key2)); } }); |
| * |
| * server.invoke(new |
| * CacheSerializableRunnable("Update server with new values for client notification") { public |
| * void run2() throws CacheException { Region r = getRootRegion(name); assertNotNull(r); |
| * r.put(key2, val2); // Create a new entry r.put(key1, val2); // Change the first entry } }); |
| * |
| * client1.invoke(new |
| * CacheSerializableRunnable("Test update from to server with true notify-by-subscription") { |
| * public void run2() throws CacheException { Region r = getRootRegion(name); assertNotNull(r); |
| * CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) |
| * r.getAttributes().getCacheListener(); |
| * |
| * ctl.waitForUpdated(key1); assertNotNull(r.getEntry(key1)); assertIndexDetailsEquals(val2, |
| * r.getEntry(key1).getValue()); // new value should have been pushed |
| * |
| * ctl.waitForCreated(key2); assertNotNull(r.getEntry(key2)); // new entry should have been pushed |
| * assertIndexDetailsEquals(val2, r.getEntry(key2).getValue()); } }); |
| * |
| * client2.invoke(new |
| * CacheSerializableRunnable("Test update from server with false notify-by-subscription") { public |
| * void run2() throws CacheException { Region r = getRootRegion(name); assertNotNull(r); |
| * CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) |
| * r.getAttributes().getCacheListener(); ctl.waitForInvalidated(key1); |
| * assertNotNull(r.getEntry(key1)); assertNull(r.getEntry(key1).getValue()); // Invalidate should |
| * have been pushed assertIndexDetailsEquals(val2, r.get(key1)); // New value should be fetched |
| * |
| * assertNull(r.getEntry(key2)); // assertNull(r.getEntry(key2).getValue()); |
| * assertIndexDetailsEquals(val2, r.get(key2)); // New entry should be fetched } }); tearDown(); } |
| * finally { // HashSet destroyedRoots = new HashSet(); try { client1.invoke(() -> |
| * CacheTestCase.remoteTearDown()); client1.invoke(() -> disconnectFromDS()); } finally { |
| * client2.invoke(() -> CacheTestCase.remoteTearDown()); client2.invoke(() -> disconnectFromDS()); |
| * } } } |
| */ |
| |
| |
| |
| // disabled - per Sudhir we don't support multiple bridges in the same VM |
| // public void test0362BridgeServersWithDiffGroupsInSameVM() throws Exception { |
| // final String name = this.getName(); |
| // final Host host = Host.getHost(0); |
| // final VM server = host.getVM(3); |
| // final VM client1 = host.getVM(1); |
| // final VM client2 = host.getVM(2); |
| // |
| // final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(3); |
| // final int bs1Port = ports[0]; |
| // final int bs2Port = ports[1]; |
| // |
| // try { |
| // server.invoke(new CacheSerializableRunnable("Setup BridgeServers and Gateway") { |
| // public void run2() throws CacheException |
| // { |
| // Cache cache = getCache(); |
| // |
| // try { |
| // |
| // // Start server in group 1 |
| // CacheServer bridge1 = cache.addCacheServer(); |
| // bridge1.setPort(bs1Port); |
| // String[] group1 = {"zGroup1"}; |
| // bridge1.setGroups(group1); |
| // bridge1.start(); |
| // |
| // // start server in group 2 |
| // CacheServer bridge2 = cache.addCacheServer(); |
| // bridge2.setPort(bs2Port); |
| // bridge2.setNotifyBySubscription(true); |
| // String[] group2 = {"zGroup2"}; |
| // bridge2.setGroups(group2); |
| // bridge2.start(); |
| // getLogWriter().info("zGroup1 port should be "+bs1Port+" zGroup2 port should be "+bs2Port); |
| // } catch (IOException ioe) { |
| // fail("Setup of BridgeServer test " + name + " failed", ioe ); |
| // } |
| // |
| // createRootRegion(name, getRegionAttributes()); |
| // } |
| // }); |
| // |
| // client1.invoke(new CacheSerializableRunnable("Test client1 to zGroup2") { |
| // public void run2() throws CacheException |
| // { |
| // createLonerDS(); |
| // AttributesFactory factory = new AttributesFactory(); |
| // factory.setScope(Scope.LOCAL); |
| // ClientServerTestCase.configureConnectionPool(factory,getServerHostName(host),bs1Port,-1,true,-1,-1, |
| // "zGroup2"); |
| // Region r = createRootRegion(name, factory.create()); |
| // r.registerInterest("whatever"); |
| // } |
| // }); |
| // |
| // client2.invoke(new CacheSerializableRunnable("Test client2 to zGroup1") { |
| // public void run2() throws CacheException |
| // { |
| // createLonerDS(); |
| // AttributesFactory factory = new AttributesFactory(); |
| // ClientServerTestCase.configureConnectionPool(factory,getServerHostName(host),bs2Port,-1,true,-1,-1, |
| // "zGroup1"); |
| // |
| // factory.setScope(Scope.LOCAL); |
| // Region r = createRootRegion(name, factory.create()); |
| // r.registerInterest("whatever"); |
| // } |
| // }); |
| // |
| // tearDown(); |
| // } finally { |
| // try { |
| // client1.invoke(() -> CacheTestCase.remoteTearDown()); |
| // client1.invoke(() -> disconnectFromDS()); |
| // } finally { |
| // client2.invoke(() -> CacheTestCase.remoteTearDown()); |
| // client2.invoke(() -> disconnectFromDS()); |
| // } |
| // } |
| // } |
| |
| public static class DelayListener extends CacheListenerAdapter { |
| private final int delay; |
| |
| public DelayListener(int delay) { |
| this.delay = delay; |
| } |
| |
| private void delay() { |
| try { |
| Thread.sleep(this.delay); |
| } catch (InterruptedException ignore) { |
| fail("interrupted"); |
| } |
| } |
| |
| @Override |
| public void afterCreate(EntryEvent event) { |
| delay(); |
| } |
| |
| @Override |
| public void afterDestroy(EntryEvent event) { |
| delay(); |
| } |
| |
| @Override |
| public void afterInvalidate(EntryEvent event) { |
| delay(); |
| } |
| |
| @Override |
| public void afterRegionDestroy(RegionEvent event) { |
| delay(); |
| } |
| |
| @Override |
| public void afterRegionCreate(RegionEvent event) { |
| delay(); |
| } |
| |
| @Override |
| public void afterRegionInvalidate(RegionEvent event) { |
| delay(); |
| } |
| |
| @Override |
| public void afterUpdate(EntryEvent event) { |
| delay(); |
| } |
| |
| @Override |
| public void afterRegionClear(RegionEvent event) { |
| delay(); |
| } |
| |
| @Override |
| public void afterRegionLive(RegionEvent event) { |
| delay(); |
| } |
| } |
| |
| /** |
| * Make sure a tx done in a server on an empty region gets sent to clients who have registered |
| * interest. |
| */ |
| @Test |
| public void test037Bug39526part1() throws CacheException, InterruptedException { |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| |
| // Create the cache servers with distributed, empty region |
| SerializableRunnable createServer = new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setDataPolicy(DataPolicy.EMPTY); |
| factory.setConcurrencyChecksEnabled(false); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| } |
| }; |
| getSystem().getLogWriter().info("before create server"); |
| vm0.invoke(createServer); |
| |
| // Create cache server client |
| final String host0 = NetworkUtils.getServerHostName(host); |
| final int vm0Port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| SerializableRunnable createClient = |
| new CacheSerializableRunnable("Create Cache Server Client") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| // create the region |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| // create bridge writer |
| ClientServerTestCase.configureConnectionPool(factory, host0, vm0Port, -1, true, -1, -1, |
| null); |
| createRegion(name, factory.create()); |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| region.registerInterestRegex(".*"); |
| } |
| }; |
| getSystem().getLogWriter().info("before create client"); |
| vm1.invoke(createClient); |
| |
| // now do a tx in the server |
| SerializableRunnable doServerTx = new CacheSerializableRunnable("doServerTx") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| Cache cache = getCache(); |
| CacheTransactionManager txmgr = cache.getCacheTransactionManager(); |
| txmgr.begin(); |
| try { |
| region.put("k1", "v1"); |
| region.put("k2", "v2"); |
| region.put("k3", "v3"); |
| } finally { |
| txmgr.commit(); |
| } |
| } |
| }; |
| getSystem().getLogWriter().info("before doServerTx"); |
| vm0.invoke(doServerTx); |
| |
| // now verify that the client receives the committed data |
| SerializableRunnable validateClient = |
| new CacheSerializableRunnable("Validate Cache Server Client") { |
| @Override |
| public void run2() throws CacheException { |
| final LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| // wait for a while for us to have the correct number of entries |
| WaitCriterion ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return region.size() == 3; |
| } |
| |
| @Override |
| public String description() { |
| return "waiting for region to be size 3"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(ev); |
| // assertIndexDetailsEquals(3, region.size()); |
| assertTrue(region.containsKey("k1")); |
| assertTrue(region.containsKey("k2")); |
| assertTrue(region.containsKey("k3")); |
| assertEquals("v1", region.getEntry("k1").getValue()); |
| assertEquals("v2", region.getEntry("k2").getValue()); |
| assertEquals("v3", region.getEntry("k3").getValue()); |
| } |
| }; |
| getSystem().getLogWriter().info("before confirmCommitOnClient"); |
| vm1.invoke(validateClient); |
| } |
| |
| /** |
| * Now confirm that a tx done in a peer of a server (the server having an empty region and wanting |
| * all events) sends the tx to its clients |
| */ |
| @Test |
| public void test038Bug39526part2() throws CacheException, InterruptedException { |
| disconnectAllFromDS(); |
| final String name = this.getName(); |
| final Host host = Host.getHost(0); |
| VM vm0 = host.getVM(0); |
| VM vm1 = host.getVM(1); |
| VM vm2 = host.getVM(2); |
| |
| // Create the cache servers with distributed, empty region |
| SerializableRunnable createServer = new CacheSerializableRunnable("Create Cache Server") { |
| @Override |
| public void run2() throws CacheException { |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setConcurrencyChecksEnabled(false); |
| factory.setDataPolicy(DataPolicy.EMPTY); |
| factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL)); |
| createRegion(name, factory.create()); |
| // pause(1000); |
| try { |
| startBridgeServer(0); |
| } catch (Exception ex) { |
| org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex); |
| } |
| } |
| }; |
| getSystem().getLogWriter().info("before create server"); |
| vm0.invoke(createServer); |
| |
| // Create cache server client |
| final String host0 = NetworkUtils.getServerHostName(host); |
| final int vm0Port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort()); |
| SerializableRunnable createClient = |
| new CacheSerializableRunnable("Create Cache Server Client") { |
| @Override |
| public void run2() throws CacheException { |
| getLonerSystem(); |
| // create the region |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.LOCAL); |
| factory.setConcurrencyChecksEnabled(false); |
| // create bridge writer |
| ClientServerTestCase.configureConnectionPool(factory, host0, vm0Port, -1, true, -1, -1, |
| null); |
| createRegion(name, factory.create()); |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| region.registerInterestRegex(".*"); |
| } |
| }; |
| getSystem().getLogWriter().info("before create client"); |
| vm1.invoke(createClient); |
| |
| SerializableRunnable createServerPeer = new CacheSerializableRunnable("Create Server Peer") { |
| @Override |
| public void run2() throws CacheException { |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setDataPolicy(DataPolicy.EMPTY); |
| factory.setConcurrencyChecksEnabled(false); |
| createRegion(name, factory.create()); |
| } |
| }; |
| getSystem().getLogWriter().info("before create server peer"); |
| vm2.invoke(createServerPeer); |
| |
| // now do a tx in the server |
| SerializableRunnable doServerTx = new CacheSerializableRunnable("doServerTx") { |
| @Override |
| public void run2() throws CacheException { |
| LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| Cache cache = getCache(); |
| CacheTransactionManager txmgr = cache.getCacheTransactionManager(); |
| txmgr.begin(); |
| try { |
| region.put("k1", "v1"); |
| region.put("k2", "v2"); |
| region.put("k3", "v3"); |
| } finally { |
| txmgr.commit(); |
| } |
| } |
| }; |
| getSystem().getLogWriter().info("before doServerTx"); |
| vm2.invoke(doServerTx); |
| |
| // @todo verify server received it but to do this need a listener in |
| // the server |
| |
| // now verify that the client receives the committed data |
| SerializableRunnable validateClient = |
| new CacheSerializableRunnable("Validate Cache Server Client") { |
| @Override |
| public void run2() throws CacheException { |
| final LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name); |
| // wait for a while for us to have the correct number of entries |
| WaitCriterion ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return region.size() == 3; |
| } |
| |
| @Override |
| public String description() { |
| return "waiting for region to be size 3"; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(ev); |
| // assertIndexDetailsEquals(3, region.size()); |
| assertTrue(region.containsKey("k1")); |
| assertTrue(region.containsKey("k2")); |
| assertTrue(region.containsKey("k3")); |
| assertEquals("v1", region.getEntry("k1").getValue()); |
| assertEquals("v2", region.getEntry("k2").getValue()); |
| assertEquals("v3", region.getEntry("k3").getValue()); |
| } |
| }; |
| getSystem().getLogWriter().info("before confirmCommitOnClient"); |
| vm1.invoke(validateClient); |
| disconnectAllFromDS(); |
| } |
| |
| static class Order implements DataSerializable { |
| int index; |
| |
| public Order() {} |
| |
| public void init(int index) { |
| this.index = index; |
| } |
| |
| public int getIndex() { |
| return index; |
| } |
| |
| @Override |
| public void toData(DataOutput out) throws IOException { |
| out.writeInt(index); |
| } |
| |
| @Override |
| public void fromData(DataInput in) throws IOException, ClassNotFoundException { |
| index = in.readInt(); |
| } |
| } |
| } |