| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.tier.sockets; |
| |
| import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; |
| import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; |
| import static org.apache.geode.test.awaitility.GeodeAwaitility.await; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.util.Properties; |
| import java.util.concurrent.CountDownLatch; |
| |
| import org.apache.logging.log4j.Logger; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import org.apache.geode.cache.Cache; |
| import org.apache.geode.cache.CacheListener; |
| import org.apache.geode.cache.DataPolicy; |
| import org.apache.geode.cache.EntryEvent; |
| import org.apache.geode.cache.InterestPolicy; |
| import org.apache.geode.cache.InterestResultPolicy; |
| import org.apache.geode.cache.PartitionAttributesFactory; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionFactory; |
| import org.apache.geode.cache.SubscriptionAttributes; |
| import org.apache.geode.cache.client.NoAvailableServersException; |
| import org.apache.geode.cache.client.PoolManager; |
| import org.apache.geode.cache.client.internal.Connection; |
| import org.apache.geode.cache.client.internal.PoolImpl; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.cache.util.CacheListenerAdapter; |
| import org.apache.geode.distributed.DistributedSystem; |
| import org.apache.geode.internal.AvailablePort; |
| import org.apache.geode.internal.cache.AbstractRegionMap; |
| import org.apache.geode.internal.cache.ClientServerObserverAdapter; |
| import org.apache.geode.internal.cache.ClientServerObserverHolder; |
| import org.apache.geode.internal.cache.GemFireCacheImpl; |
| import org.apache.geode.internal.logging.LogService; |
| import org.apache.geode.test.dunit.Host; |
| import org.apache.geode.test.dunit.NetworkUtils; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; |
| import org.apache.geode.test.junit.categories.ClientServerTest; |
| |
| /** |
| * Tests client server FORCE_INVALIDATE |
| */ |
| @Category({ClientServerTest.class}) |
| public class ClientServerForceInvalidateDUnitTest extends JUnit4CacheTestCase { |
| |
| private static final Logger logger = LogService.getLogger(); |
| |
| private static Region<String, String> region1; |
| |
| private static final String REGION_NAME1 = "ClientServerForceInvalidateDUnitTest_region1"; |
| |
| private static Host host; |
| |
| private static VM server1; |
| private static VM server2; |
| |
| @Override |
| public final void postSetUp() throws Exception { |
| host = Host.getHost(0); |
| server1 = host.getVM(0); |
| server2 = host.getVM(1); |
| } |
| |
| private int initServerCache(VM vm, boolean concurrencyChecksEnabled, boolean partitioned) { |
| return vm.invoke(() -> createServerCache(concurrencyChecksEnabled, partitioned, 0)); |
| } |
| |
| @Test |
| public void testForceInvalidateOnCachingProxyWithConcurrencyChecks() throws Exception { |
| dotestForceInvalidate(true, true, false, true); |
| } |
| |
| @Test |
| public void testForceInvalidateOnCachingProxyWithConcurrencyChecksOnlyOnServer() |
| throws Exception { |
| dotestForceInvalidate(true, false, false, true); |
| } |
| |
| @Test |
| public void testForceInvalidateOnCachingProxyWithConcurrencyChecksOnlyOnClient() |
| throws Exception { |
| dotestForceInvalidate(false, true, false, true); |
| } |
| |
| @Test |
| public void testForceInvalidateOnProxyWithConcurrencyChecks() throws Exception { |
| dotestForceInvalidate(true, true, true, true); |
| } |
| |
| @Test |
| public void testForceInvalidateOnProxyWithConcurrencyChecksOnlyOnServer() throws Exception { |
| dotestForceInvalidate(true, false, true, true); |
| } |
| |
| @Test |
| public void testForceInvalidateOnProxyWithConcurrencyChecksOnlyOnClient() throws Exception { |
| dotestForceInvalidate(false, true, true, true); |
| } |
| |
| @Test |
| public void testForceInvalidateOnCachingProxyWithConcurrencyChecksServerReplicated() |
| throws Exception { |
| dotestForceInvalidate(true, true, false, false); |
| } |
| |
| @Test |
| public void testForceInvalidateOnProxyWithConcurrencyChecksServerReplicated() throws Exception { |
| dotestForceInvalidate(true, true, true, false); |
| } |
| |
| /** |
| * 1. create an entry 2. Install a observer to pause sending subscription events to the client 3. |
| * invalidate the entry from the server (it will be done on server but pause prevents it from |
| * being sent to the client). 4. verify that afterInvalidate was invoked on the server. 5. change |
| * the same entry (do a put). Both the client and server now have the latest version which is this |
| * update. 6. unpause the observer so that it now sends invalidate event to client. It will arrive |
| * late and not be done because of concurrency checks. 7. verify that afterInvalidate was invoked |
| * on the client. |
| */ |
| @Test |
| public void testInvalidateLosingOnConcurrencyChecks() throws Exception { |
| try { |
| setupServerAndClientVMs(true, true, false, false); |
| final String key = "delayInvalidate"; |
| region1.registerInterest("ALL_KEYS", InterestResultPolicy.NONE, false, false); |
| region1.put(key, "1000"); |
| logger.info("installing observers"); |
| server1.invoke(() -> installObserver()); |
| server2.invoke(() -> installObserver()); |
| |
| server2.invoke(() -> invalidateOnServer(key)); |
| |
| validateServerListenerInvoked(); |
| |
| logger.info("putting a new value 1001"); |
| region1.put(key, "1001"); |
| logger.info("UnPausing observers"); |
| server1.invoke(() -> unpauseObserver()); |
| server2.invoke(() -> unpauseObserver()); |
| |
| waitForClientInvalidate(); |
| |
| } finally { |
| server1.invoke(() -> cleanupObserver()); |
| server2.invoke(() -> cleanupObserver()); |
| } |
| } |
| |
| private static void installObserver() { |
| CacheClientProxy.AFTER_MESSAGE_CREATION_FLAG = true; |
| ClientServerObserverHolder.setInstance(new DelaySendingEvent()); |
| } |
| |
| private static void unpauseObserver() { |
| DelaySendingEvent observer = (DelaySendingEvent) ClientServerObserverHolder.getInstance(); |
| observer.latch.countDown(); |
| } |
| |
| private static void cleanupObserver() { |
| CacheClientProxy.AFTER_MESSAGE_CREATION_FLAG = false; |
| ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter()); |
| } |
| |
| private static void invalidateOnServer(final Object key) { |
| Region<?, ?> r = GemFireCacheImpl.getExisting().getRegion(REGION_NAME1); |
| r.invalidate(key); |
| } |
| |
| private static void createOnServer(final Object key, final Object value) { |
| @SuppressWarnings("unchecked") |
| Region<Object, Object> r = GemFireCacheImpl.getExisting().getRegion(REGION_NAME1); |
| r.create(key, value); |
| } |
| |
| private void waitForClientInvalidate() { |
| await() |
| .until(() -> hasClientListenerAfterInvalidateBeenInvoked()); |
| } |
| |
| static class DelaySendingEvent extends ClientServerObserverAdapter { |
| CountDownLatch latch = new CountDownLatch(1); |
| |
| @Override |
| public void afterMessageCreation(Message msg) { |
| try { |
| logger.info("waiting in DelaySendingEvent..."); |
| latch.await(); |
| logger.info("finished waiting in DelaySendingEvent"); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| } |
| } |
| |
| /** |
| * 1. Invalidate a non-existent entry from the server. 2. Validate that the servers see after |
| * invalidate. 3. Validate that the subscribed client invokes after invalidate. |
| */ |
| private void dotestForceInvalidate(boolean concurrencyChecksOnServer, |
| boolean concurrencyChecksOnClient, boolean clientEmpty, boolean serverPartitioned) |
| throws Exception { |
| setupServerAndClientVMs(concurrencyChecksOnServer, concurrencyChecksOnClient, clientEmpty, |
| serverPartitioned); |
| |
| server2.invoke(() -> createOnServer("key", "value")); |
| region1.registerInterest("ALL_KEYS", InterestResultPolicy.NONE, false, false); |
| server2.invoke(() -> invalidateOnServer("key")); |
| |
| validateServerListenerInvoked(); |
| waitForClientInvalidate(); |
| } |
| |
| private void setupServerAndClientVMs(boolean concurrencyChecksOnServer, |
| boolean concurrencyChecksOnClient, boolean clientEmpty, boolean serverPartitioned) |
| throws Exception { |
| int port1 = initServerCache(server1, concurrencyChecksOnServer, serverPartitioned); // vm0 |
| int port2 = initServerCache(server2, concurrencyChecksOnServer, serverPartitioned); // vm1 |
| String serverName = NetworkUtils.getServerHostName(Host.getHost(0)); |
| createClientCache(serverName, port1, port2, clientEmpty, concurrencyChecksOnClient); |
| logger.info("testing force invalidate on on client"); |
| } |
| |
| private void validateServerListenerInvoked() { |
| boolean listenerInvoked = |
| server1.invoke(() -> validateOnServer()) || server2.invoke(() -> validateOnServer()); |
| assertTrue(listenerInvoked); |
| } |
| |
| private static boolean validateOnServer() { |
| Region<?, ?> region = GemFireCacheImpl.getExisting().getRegion(REGION_NAME1); |
| CacheListener<?, ?>[] listeners = region.getAttributes().getCacheListeners(); |
| for (CacheListener<?, ?> listener : listeners) { |
| if (listener instanceof ServerListener) { |
| ServerListener serverListener = (ServerListener) listener; |
| if (serverListener.afterInvalidateInvoked) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| private boolean hasClientListenerAfterInvalidateBeenInvoked() { |
| Region<?, ?> region = getCache().getRegion(REGION_NAME1); |
| CacheListener<?, ?>[] listeners = region.getAttributes().getCacheListeners(); |
| for (CacheListener<?, ?> listener : listeners) { |
| if (listener instanceof ClientListener) { |
| ClientListener clientListener = (ClientListener) listener; |
| if (clientListener.afterInvalidateInvoked) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| private static Integer createServerCache(Boolean concurrencyChecksEnabled, Boolean partitioned, |
| Integer maxThreads) throws Exception { |
| AbstractRegionMap.FORCE_INVALIDATE_EVENT = true; |
| Properties props = new Properties(); |
| Cache cache = new ClientServerForceInvalidateDUnitTest().createCacheV(props); |
| RegionFactory<String, String> factory = cache.createRegionFactory(); |
| if (partitioned) { |
| factory.setDataPolicy(DataPolicy.PARTITION); |
| factory.setPartitionAttributes(new PartitionAttributesFactory<String, String>() |
| .setRedundantCopies(0).setTotalNumBuckets(251).create()); |
| } else { |
| factory.setDataPolicy(DataPolicy.REPLICATE); |
| } |
| factory.setConcurrencyChecksEnabled(concurrencyChecksEnabled); |
| factory.addCacheListener(new ServerListener()); |
| Region<String, String> r1 = factory.create(REGION_NAME1); |
| assertNotNull(r1); |
| |
| CacheServer server = cache.addCacheServer(); |
| int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); |
| logger.info("Starting server on port " + port); |
| server.setPort(port); |
| server.setMaxThreads(maxThreads.intValue()); |
| server.start(); |
| logger.info("Started server on port " + server.getPort()); |
| return new Integer(server.getPort()); |
| |
| } |
| |
| public static void createClientCache(String h, int port1, int port2, boolean empty, |
| boolean concurrenctChecksEnabled) throws Exception { |
| AbstractRegionMap.FORCE_INVALIDATE_EVENT = true; |
| Properties props = new Properties(); |
| props.setProperty(MCAST_PORT, "0"); |
| props.setProperty(LOCATORS, ""); |
| Cache cache = new ClientServerForceInvalidateDUnitTest().createCacheV(props); |
| PoolImpl p = (PoolImpl) PoolManager.createFactory().addServer(h, port1).addServer(h, port2) |
| .setSubscriptionEnabled(true).setReadTimeout(1000) |
| .setSocketBufferSize(32768).setMinConnections(3).setSubscriptionRedundancy(-1) |
| .setPingInterval(2000).create("ClientServerForceInvalidateDUnitTestPool"); |
| |
| RegionFactory<String, String> factory = cache.createRegionFactory(); |
| if (empty) { |
| factory.setDataPolicy(DataPolicy.EMPTY); |
| factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL)); |
| } else { |
| factory.setDataPolicy(DataPolicy.NORMAL); |
| } |
| factory.setPoolName(p.getName()); |
| factory.setConcurrencyChecksEnabled(concurrenctChecksEnabled); |
| region1 = factory.create(REGION_NAME1); |
| region1.registerInterest("ALL_KEYS", InterestResultPolicy.NONE, false, false); |
| region1.getAttributesMutator().addCacheListener(new ClientListener()); |
| assertNotNull(region1); |
| await().until(() -> poolReady(p)); |
| } |
| |
| private static boolean poolReady(final PoolImpl pool) { |
| try { |
| Connection conn = pool.acquireConnection(); |
| if (conn == null) { |
| // excuse = "acquireConnection returned null?"; |
| return false; |
| } |
| return true; |
| } catch (NoAvailableServersException e) { |
| // excuse = "Cannot find a server: " + e; |
| return false; |
| } |
| } |
| |
| @SuppressWarnings("deprecation") |
| private Cache createCacheV(Properties props) throws Exception { |
| DistributedSystem ds = getSystem(props); |
| assertNotNull(ds); |
| ds.disconnect(); |
| ds = getSystem(props); |
| Cache cache = getCache(); |
| assertNotNull(cache); |
| return cache; |
| } |
| |
| static class ClientListener extends CacheListenerAdapter<String, String> { |
| public boolean afterInvalidateInvoked; |
| |
| @Override |
| public void afterCreate(EntryEvent<String, String> event) { |
| super.afterCreate(event); |
| logger.info("afterCreate: {" + event.getOldValue() + " -> " + event.getNewValue() + "} at=" |
| + System.currentTimeMillis()); |
| } |
| |
| @Override |
| public void afterUpdate(EntryEvent<String, String> event) { |
| super.afterUpdate(event); |
| logger.info("afterUpdate: {" + event.getOldValue() + " -> " + event.getNewValue() + "} at=" |
| + System.currentTimeMillis()); |
| } |
| |
| @Override |
| public void afterInvalidate(final EntryEvent<String, String> event) { |
| super.afterInvalidate(event); |
| afterInvalidateInvoked = true; |
| String prefix = ""; |
| if (!event.isOriginRemote()) { |
| prefix = " "; |
| } |
| logger.info(prefix + "afterInvalidate: {" + event.getOldValue() + " -> " + event.getNewValue() |
| + "} at=" + System.currentTimeMillis()); |
| } |
| } |
| |
| static class ServerListener extends CacheListenerAdapter<String, String> { |
| boolean afterInvalidateInvoked; |
| |
| @Override |
| public void afterCreate(EntryEvent<String, String> event) { |
| super.afterCreate(event); |
| logger.info("afterCreate: {" + event.getOldValue() + " -> " + event.getNewValue() + "} at=" |
| + System.currentTimeMillis()); |
| } |
| |
| @Override |
| public void afterUpdate(EntryEvent<String, String> event) { |
| super.afterUpdate(event); |
| logger.info("afterUpdate: {" + event.getOldValue() + " -> " + event.getNewValue() + "} at=" |
| + System.currentTimeMillis()); |
| } |
| |
| @Override |
| public void afterInvalidate(EntryEvent<String, String> event) { |
| super.afterInvalidate(event); |
| afterInvalidateInvoked = true; |
| logger.info("afterInvalidate: {" + event.getOldValue() + " -> " + event.getNewValue() |
| + "} at=" + System.currentTimeMillis()); |
| } |
| } |
| |
| @Override |
| public final void postTearDownCacheTestCase() throws Exception { |
| // close the clients first |
| closeForceInvalidateCache(); |
| // then close the servers |
| server1.invoke(() -> closeForceInvalidateCache()); |
| server2.invoke(() -> closeForceInvalidateCache()); |
| } |
| |
| @SuppressWarnings("deprecation") |
| private static void closeForceInvalidateCache() { |
| AbstractRegionMap.FORCE_INVALIDATE_EVENT = false; |
| Cache cache = new ClientServerForceInvalidateDUnitTest().getCache(); |
| if (cache != null && !cache.isClosed()) { |
| cache.close(); |
| cache.getDistributedSystem().disconnect(); |
| } |
| } |
| |
| } |