blob: 11f28f5d7d1e5864b295f057353ab51dd55dca50 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.logging.log4j.Logger;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.InterestPolicy;
import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.SubscriptionAttributes;
import org.apache.geode.cache.client.NoAvailableServersException;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.client.internal.Connection;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.AvailablePort;
import org.apache.geode.internal.cache.AbstractRegionMap;
import org.apache.geode.internal.cache.ClientServerObserverAdapter;
import org.apache.geode.internal.cache.ClientServerObserverHolder;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.junit.categories.ClientServerTest;
/**
* Tests client server FORCE_INVALIDATE
*/
@Category({ClientServerTest.class})
public class ClientServerForceInvalidateDUnitTest extends JUnit4CacheTestCase {
private static final Logger logger = LogService.getLogger();
private static Region<String, String> region1;
private static final String REGION_NAME1 = "ClientServerForceInvalidateDUnitTest_region1";
private static Host host;
private static VM server1;
private static VM server2;
@Override
public final void postSetUp() throws Exception {
host = Host.getHost(0);
server1 = host.getVM(0);
server2 = host.getVM(1);
}
private int initServerCache(VM vm, boolean concurrencyChecksEnabled, boolean partitioned) {
return vm.invoke(() -> createServerCache(concurrencyChecksEnabled, partitioned, 0));
}
@Test
public void testForceInvalidateOnCachingProxyWithConcurrencyChecks() throws Exception {
dotestForceInvalidate(true, true, false, true);
}
@Test
public void testForceInvalidateOnCachingProxyWithConcurrencyChecksOnlyOnServer()
throws Exception {
dotestForceInvalidate(true, false, false, true);
}
@Test
public void testForceInvalidateOnCachingProxyWithConcurrencyChecksOnlyOnClient()
throws Exception {
dotestForceInvalidate(false, true, false, true);
}
@Test
public void testForceInvalidateOnProxyWithConcurrencyChecks() throws Exception {
dotestForceInvalidate(true, true, true, true);
}
@Test
public void testForceInvalidateOnProxyWithConcurrencyChecksOnlyOnServer() throws Exception {
dotestForceInvalidate(true, false, true, true);
}
@Test
public void testForceInvalidateOnProxyWithConcurrencyChecksOnlyOnClient() throws Exception {
dotestForceInvalidate(false, true, true, true);
}
@Test
public void testForceInvalidateOnCachingProxyWithConcurrencyChecksServerReplicated()
throws Exception {
dotestForceInvalidate(true, true, false, false);
}
@Test
public void testForceInvalidateOnProxyWithConcurrencyChecksServerReplicated() throws Exception {
dotestForceInvalidate(true, true, true, false);
}
/**
* 1. create an entry 2. Install a observer to pause sending subscription events to the client 3.
* invalidate the entry from the server (it will be done on server but pause prevents it from
* being sent to the client). 4. verify that afterInvalidate was invoked on the server. 5. change
* the same entry (do a put). Both the client and server now have the latest version which is this
* update. 6. unpause the observer so that it now sends invalidate event to client. It will arrive
* late and not be done because of concurrency checks. 7. verify that afterInvalidate was invoked
* on the client.
*/
@Test
public void testInvalidateLosingOnConcurrencyChecks() throws Exception {
try {
setupServerAndClientVMs(true, true, false, false);
final String key = "delayInvalidate";
region1.registerInterest("ALL_KEYS", InterestResultPolicy.NONE, false, false);
region1.put(key, "1000");
logger.info("installing observers");
server1.invoke(() -> installObserver());
server2.invoke(() -> installObserver());
server2.invoke(() -> invalidateOnServer(key));
validateServerListenerInvoked();
logger.info("putting a new value 1001");
region1.put(key, "1001");
logger.info("UnPausing observers");
server1.invoke(() -> unpauseObserver());
server2.invoke(() -> unpauseObserver());
waitForClientInvalidate();
} finally {
server1.invoke(() -> cleanupObserver());
server2.invoke(() -> cleanupObserver());
}
}
private static void installObserver() {
CacheClientProxy.AFTER_MESSAGE_CREATION_FLAG = true;
ClientServerObserverHolder.setInstance(new DelaySendingEvent());
}
private static void unpauseObserver() {
DelaySendingEvent observer = (DelaySendingEvent) ClientServerObserverHolder.getInstance();
observer.latch.countDown();
}
private static void cleanupObserver() {
CacheClientProxy.AFTER_MESSAGE_CREATION_FLAG = false;
ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter());
}
private static void invalidateOnServer(final Object key) {
Region<?, ?> r = GemFireCacheImpl.getExisting().getRegion(REGION_NAME1);
r.invalidate(key);
}
private static void createOnServer(final Object key, final Object value) {
@SuppressWarnings("unchecked")
Region<Object, Object> r = GemFireCacheImpl.getExisting().getRegion(REGION_NAME1);
r.create(key, value);
}
private void waitForClientInvalidate() {
await()
.until(() -> hasClientListenerAfterInvalidateBeenInvoked());
}
static class DelaySendingEvent extends ClientServerObserverAdapter {
CountDownLatch latch = new CountDownLatch(1);
@Override
public void afterMessageCreation(Message msg) {
try {
logger.info("waiting in DelaySendingEvent...");
latch.await();
logger.info("finished waiting in DelaySendingEvent");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 1. Invalidate a non-existent entry from the server. 2. Validate that the servers see after
* invalidate. 3. Validate that the subscribed client invokes after invalidate.
*/
private void dotestForceInvalidate(boolean concurrencyChecksOnServer,
boolean concurrencyChecksOnClient, boolean clientEmpty, boolean serverPartitioned)
throws Exception {
setupServerAndClientVMs(concurrencyChecksOnServer, concurrencyChecksOnClient, clientEmpty,
serverPartitioned);
server2.invoke(() -> createOnServer("key", "value"));
region1.registerInterest("ALL_KEYS", InterestResultPolicy.NONE, false, false);
server2.invoke(() -> invalidateOnServer("key"));
validateServerListenerInvoked();
waitForClientInvalidate();
}
private void setupServerAndClientVMs(boolean concurrencyChecksOnServer,
boolean concurrencyChecksOnClient, boolean clientEmpty, boolean serverPartitioned)
throws Exception {
int port1 = initServerCache(server1, concurrencyChecksOnServer, serverPartitioned); // vm0
int port2 = initServerCache(server2, concurrencyChecksOnServer, serverPartitioned); // vm1
String serverName = NetworkUtils.getServerHostName(Host.getHost(0));
createClientCache(serverName, port1, port2, clientEmpty, concurrencyChecksOnClient);
logger.info("testing force invalidate on on client");
}
private void validateServerListenerInvoked() {
boolean listenerInvoked =
server1.invoke(() -> validateOnServer()) || server2.invoke(() -> validateOnServer());
assertTrue(listenerInvoked);
}
private static boolean validateOnServer() {
Region<?, ?> region = GemFireCacheImpl.getExisting().getRegion(REGION_NAME1);
CacheListener<?, ?>[] listeners = region.getAttributes().getCacheListeners();
for (CacheListener<?, ?> listener : listeners) {
if (listener instanceof ServerListener) {
ServerListener serverListener = (ServerListener) listener;
if (serverListener.afterInvalidateInvoked) {
return true;
}
}
}
return false;
}
private boolean hasClientListenerAfterInvalidateBeenInvoked() {
Region<?, ?> region = getCache().getRegion(REGION_NAME1);
CacheListener<?, ?>[] listeners = region.getAttributes().getCacheListeners();
for (CacheListener<?, ?> listener : listeners) {
if (listener instanceof ClientListener) {
ClientListener clientListener = (ClientListener) listener;
if (clientListener.afterInvalidateInvoked) {
return true;
}
}
}
return false;
}
private static Integer createServerCache(Boolean concurrencyChecksEnabled, Boolean partitioned,
Integer maxThreads) throws Exception {
AbstractRegionMap.FORCE_INVALIDATE_EVENT = true;
Properties props = new Properties();
Cache cache = new ClientServerForceInvalidateDUnitTest().createCacheV(props);
RegionFactory<String, String> factory = cache.createRegionFactory();
if (partitioned) {
factory.setDataPolicy(DataPolicy.PARTITION);
factory.setPartitionAttributes(new PartitionAttributesFactory<String, String>()
.setRedundantCopies(0).setTotalNumBuckets(251).create());
} else {
factory.setDataPolicy(DataPolicy.REPLICATE);
}
factory.setConcurrencyChecksEnabled(concurrencyChecksEnabled);
factory.addCacheListener(new ServerListener());
Region<String, String> r1 = factory.create(REGION_NAME1);
assertNotNull(r1);
CacheServer server = cache.addCacheServer();
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
logger.info("Starting server on port " + port);
server.setPort(port);
server.setMaxThreads(maxThreads.intValue());
server.start();
logger.info("Started server on port " + server.getPort());
return new Integer(server.getPort());
}
public static void createClientCache(String h, int port1, int port2, boolean empty,
boolean concurrenctChecksEnabled) throws Exception {
AbstractRegionMap.FORCE_INVALIDATE_EVENT = true;
Properties props = new Properties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "");
Cache cache = new ClientServerForceInvalidateDUnitTest().createCacheV(props);
PoolImpl p = (PoolImpl) PoolManager.createFactory().addServer(h, port1).addServer(h, port2)
.setSubscriptionEnabled(true).setReadTimeout(1000)
.setSocketBufferSize(32768).setMinConnections(3).setSubscriptionRedundancy(-1)
.setPingInterval(2000).create("ClientServerForceInvalidateDUnitTestPool");
RegionFactory<String, String> factory = cache.createRegionFactory();
if (empty) {
factory.setDataPolicy(DataPolicy.EMPTY);
factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
} else {
factory.setDataPolicy(DataPolicy.NORMAL);
}
factory.setPoolName(p.getName());
factory.setConcurrencyChecksEnabled(concurrenctChecksEnabled);
region1 = factory.create(REGION_NAME1);
region1.registerInterest("ALL_KEYS", InterestResultPolicy.NONE, false, false);
region1.getAttributesMutator().addCacheListener(new ClientListener());
assertNotNull(region1);
await().until(() -> poolReady(p));
}
private static boolean poolReady(final PoolImpl pool) {
try {
Connection conn = pool.acquireConnection();
if (conn == null) {
// excuse = "acquireConnection returned null?";
return false;
}
return true;
} catch (NoAvailableServersException e) {
// excuse = "Cannot find a server: " + e;
return false;
}
}
@SuppressWarnings("deprecation")
private Cache createCacheV(Properties props) throws Exception {
DistributedSystem ds = getSystem(props);
assertNotNull(ds);
ds.disconnect();
ds = getSystem(props);
Cache cache = getCache();
assertNotNull(cache);
return cache;
}
static class ClientListener extends CacheListenerAdapter<String, String> {
public boolean afterInvalidateInvoked;
@Override
public void afterCreate(EntryEvent<String, String> event) {
super.afterCreate(event);
logger.info("afterCreate: {" + event.getOldValue() + " -> " + event.getNewValue() + "} at="
+ System.currentTimeMillis());
}
@Override
public void afterUpdate(EntryEvent<String, String> event) {
super.afterUpdate(event);
logger.info("afterUpdate: {" + event.getOldValue() + " -> " + event.getNewValue() + "} at="
+ System.currentTimeMillis());
}
@Override
public void afterInvalidate(final EntryEvent<String, String> event) {
super.afterInvalidate(event);
afterInvalidateInvoked = true;
String prefix = "";
if (!event.isOriginRemote()) {
prefix = " ";
}
logger.info(prefix + "afterInvalidate: {" + event.getOldValue() + " -> " + event.getNewValue()
+ "} at=" + System.currentTimeMillis());
}
}
static class ServerListener extends CacheListenerAdapter<String, String> {
boolean afterInvalidateInvoked;
@Override
public void afterCreate(EntryEvent<String, String> event) {
super.afterCreate(event);
logger.info("afterCreate: {" + event.getOldValue() + " -> " + event.getNewValue() + "} at="
+ System.currentTimeMillis());
}
@Override
public void afterUpdate(EntryEvent<String, String> event) {
super.afterUpdate(event);
logger.info("afterUpdate: {" + event.getOldValue() + " -> " + event.getNewValue() + "} at="
+ System.currentTimeMillis());
}
@Override
public void afterInvalidate(EntryEvent<String, String> event) {
super.afterInvalidate(event);
afterInvalidateInvoked = true;
logger.info("afterInvalidate: {" + event.getOldValue() + " -> " + event.getNewValue()
+ "} at=" + System.currentTimeMillis());
}
}
@Override
public final void postTearDownCacheTestCase() throws Exception {
// close the clients first
closeForceInvalidateCache();
// then close the servers
server1.invoke(() -> closeForceInvalidateCache());
server2.invoke(() -> closeForceInvalidateCache());
}
@SuppressWarnings("deprecation")
private static void closeForceInvalidateCache() {
AbstractRegionMap.FORCE_INVALIDATE_EVENT = false;
Cache cache = new ClientServerForceInvalidateDUnitTest().getCache();
if (cache != null && !cache.isClosed()) {
cache.close();
cache.getDistributedSystem().disconnect();
}
}
}