blob: 4eb487dda55c2b027888deae740658f89d4c1852 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets;
import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.distributed.ConfigurationProperties.DELTA_PROPAGATION;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.Iterator;
import java.util.Properties;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.cache.CacheServerImpl;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.ClientServerTest;
/**
* This test verifies the per-client notify-by-subscription (NBS) override functionality along with
* register interest new receiveValues flag. Taken from the existing ClientConflationDUnitTest.java
* and modified.
*
* @since GemFire 6.0.3
*/
@Category({ClientServerTest.class})
public class ClientInterestNotifyDUnitTest extends JUnit4DistributedTestCase {
class EventListener extends CacheListenerAdapter {
public EventListener(String name) {
m_name = name;
}
private String m_name = null;
private int m_creates = 0;
private int m_updates = 0;
private int m_invalidates = 0;
private int m_destroys = 0;
@Override
public void afterCreate(EntryEvent event) {
m_creates++;
}
@Override
public void afterUpdate(EntryEvent event) {
m_updates++;
}
@Override
public void afterInvalidate(EntryEvent event) {
m_invalidates++;
}
@Override
public void afterDestroy(EntryEvent event) {
m_destroys++;
}
public void reset() {
m_creates = 0;
m_updates = 0;
m_invalidates = 0;
m_destroys = 0;
}
// validate expected event counts with actual counted
public void validate(int creates, int updates, int invalidates, int destroys) {
// Wait for the last destroy event to arrive.
try {
await().until(() -> {
return (destroys == m_destroys);
});
} catch (Exception ex) {
// The event is not received in a given wait time.
// The check will be done below to report the valid reason.
}
GemFireCacheImpl.getInstance().getLogger()
.info(m_name + ": creates: expected=" + creates + ", actual=" + m_creates);
GemFireCacheImpl.getInstance().getLogger()
.info(m_name + ": updates: expected=" + updates + ", actual=" + m_updates);
GemFireCacheImpl.getInstance().getLogger()
.info(m_name + ": invalidates: expected=" + invalidates + ", actual=" + m_invalidates);
GemFireCacheImpl.getInstance().getLogger()
.info(m_name + ": destroys: expected=" + destroys + ", actual=" + m_destroys);
assertEquals("Creates :", creates, m_creates);
assertEquals("Updates :", updates, m_updates);
assertEquals("Invalidates :", invalidates, m_invalidates);
assertEquals("Destroys :", destroys, m_destroys);
}
}
// server is on master controller
VM vm0 = null; // feeder
VM vm1 = null; // client1
/*
* VM vm2 = null; // client2 VM vm3 = null; // client3
*/
private static Cache cacheServer = null;
private int PORT;
private static int poolNameCounter = 0;
// Region 1 only does interest registrations with receiveValues flag set to true
private static final String REGION_NAME1 = "ClientInterestNotifyDUnitTest_region1";
// Region 2 only does interest registrations with receiveValues flag set to false
private static final String REGION_NAME2 = "ClientInterestNotifyDUnitTest_region2";
// Region 3 does NOT register any interest
private static final String REGION_NAME3 = "ClientInterestNotifyDUnitTest_region3";
@Override
public final void postSetUp() throws Exception {
disconnectAllFromDS();
final Host host = Host.getHost(0);
vm0 = host.getVM(0);
vm1 = host.getVM(1);
}
private Cache createCache(Properties props) throws Exception {
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "");
DistributedSystem ds = DistributedSystem.connect(props);
Cache cache = CacheFactory.create(ds);
if (cache == null) {
throw new Exception("CacheFactory.create() returned null ");
}
return cache;
}
@Test
public void testInterestNotify() throws Exception {
performSteps();
}
private void performSteps() throws Exception {
// Server is created with notify-by-subscription (NBS) set to false.
PORT = createServerCache();
Host host = Host.getHost(0);
// Create a feeder.
vm0.invoke(() -> ClientInterestNotifyDUnitTest
.createClientCacheFeeder(NetworkUtils.getServerHostName(host), new Integer(PORT)));
// Client 1 overrides NBS to true.
// Client 2 "overrides" NSB to false.
// Client 3 uses the default NBS which is false on the server.
vm1.invoke(() -> ClientInterestNotifyDUnitTest
.createClientCache(NetworkUtils.getServerHostName(host), new Integer(PORT), "ClientOn"));
// Feeder doFeed does one put on one key for each of the 3 regions so
// that the following client RI with ALL_KEYS and KEYS_VALUE result works.
vm0.invoke(() -> ClientInterestNotifyDUnitTest.doFeed());
// RI on ALL_KEYS with InterestResultPolicy KEYS_VALUES.
vm1.invoke(() -> ClientInterestNotifyDUnitTest.registerInterest());
// Get key for region 3 for all clients to check no unwanted notifications
// arrive on client 1 region 3 since we do not register interest on any
// client but notifications should arrive for client 2 and client 3.
vm1.invoke(() -> ClientInterestNotifyDUnitTest.getEntries());
// Feeder doEntryOps does 2 puts, 1 invalidate and 1 destroy on a
// single key for each of the 3 regions.
vm0.invoke(() -> ClientInterestNotifyDUnitTest.doEntryOps());
waitForQueuesToDrain();
// Unregister interest to check it works and no extra notifications received.
vm1.invoke(() -> ClientInterestNotifyDUnitTest.unregisterInterest());
// Feeder doEntryOps again does 2 puts, 1 invalidate and 1 destroy on a
// single key for each of the 3 regions while no interest on the clients.
vm0.invoke(() -> ClientInterestNotifyDUnitTest.doEntryOps());
assertAllQueuesEmpty(); // since no client has registered interest
// Re-register interest on all clients except for region 3 again.
vm1.invoke(() -> ClientInterestNotifyDUnitTest.registerInterest());
// Feeder doEntryOps again does 2 puts, 1 invalidate and 1 destroy on a
// single key for each of the 3 regions after clients re-register interest.
vm0.invoke(() -> ClientInterestNotifyDUnitTest.doEntryOps());
waitForQueuesToDrain();
// doValidation on all listeners:
// Client invokes listeners as follows:
// 1. For update notification:
// a. afterCreate() if the key does not exist in the local cache.
// b. afterUpdate() if the key exists even if the entry is invalid.
// 2. For invalidate notification:
// a. afterInvalidate() only if the key/entry exists and is not already invalid.
// 3. For destroy notification:
// a. afterDestroy() only if the key/entry is not already destroyed.
// RI is done for ALL_KEYS with InterestResultPolicy KEYS_VALUES.
// For region 1 we set the RI receiveValues param to true so
// that it behaves like notify-by-subscription is set to true.
// For region 2 we set the RI receiveValues param to false so
// that it behaves like notify-by-subscription is set to false but
// only for matching interested keys.
// For region 3 we do not register any interest to check that it
// does not get any events other than those expected from the
// server's notify-by-subscription setting which may be overridden.
// Region 3 calls a single afterCreate() because of the single get() done.
// This is to verify client 1 does not receive any unwanted event traffic.
// Region 3 of clients 2 and 3 receive an invalidate and a destroy each
// because NBS is set to false for those clients.
vm1.invoke(() -> ClientInterestNotifyDUnitTest.doValidation(REGION_NAME1, 1, 3, 2, 2));
vm1.invoke(() -> ClientInterestNotifyDUnitTest.doValidation(REGION_NAME2, 0, 0, 1, 1));
vm1.invoke(() -> ClientInterestNotifyDUnitTest.doValidation(REGION_NAME3, 1, 0, 0, 0));
}
/**
* create properties for a loner VM
*/
private static Properties createProperties1(/* String nbs */) {
Properties props = new Properties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "");
return props;
}
private static void createPool2(String host, AttributesFactory factory, Integer port) {
PoolFactory pf = PoolManager.createFactory();
pf.addServer(host, port.intValue()).setSubscriptionEnabled(true)
.setReadTimeout(10000).setSocketBufferSize(32768).setPingInterval(1000).setMinConnections(3)
.setSubscriptionRedundancy(-1);
Pool pool = pf.create("superpoolish" + (poolNameCounter++));
factory.setPoolName(pool.getName());
}
/**
* Do validation based on feeder events received by the client
*/
public static void doValidation(String region, int creates, int updates, int invalidates,
int destroys) {
Cache cacheClient = GemFireCacheImpl.getInstance();
EventListener listener = null;
listener = (EventListener) cacheClient.getRegion(region).getAttributes().getCacheListeners()[0];
listener.validate(creates, updates, invalidates, destroys);
}
/**
* create client with 3 regions each with a unique listener
*
*/
public static void createClientCache(String host, Integer port, /* String nbs, */
String name) throws Exception {
ClientInterestNotifyDUnitTest test = new ClientInterestNotifyDUnitTest();
Cache cacheClient = test.createCache(createProperties1(/* nbs */));
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
createPool2(host, factory, port);
factory.setCacheListener(test.new EventListener(name + REGION_NAME1));
RegionAttributes attrs = factory.create();
cacheClient.createRegion(REGION_NAME1, attrs);
factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
createPool2(host, factory, port);
factory.setCacheListener(test.new EventListener(name + REGION_NAME2));
attrs = factory.create();
cacheClient.createRegion(REGION_NAME2, attrs);
factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
createPool2(host, factory, port);
factory.setCacheListener(test.new EventListener(name + REGION_NAME3));
attrs = factory.create();
cacheClient.createRegion(REGION_NAME3, attrs);
}
public static void createClientCacheFeeder(String host, Integer port) throws Exception {
ClientInterestNotifyDUnitTest test = new ClientInterestNotifyDUnitTest();
Cache cacheFeeder = test.createCache(createProperties1(
/* DistributionConfig.NOTIFY_BY_SUBSCRIPTION_OVERRIDE_PROP_VALUE_DEFAULT */));
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
createPool2(host, factory, port);
RegionAttributes attrs = factory.create();
cacheFeeder.createRegion(REGION_NAME1, attrs);
cacheFeeder.createRegion(REGION_NAME2, attrs);
cacheFeeder.createRegion(REGION_NAME3, attrs);
}
/**
* Assert all queues are empty to aid later assertion for listener event counts.
*/
// NOTE: replaced with waitForQueuesToDrain() using waitcriterion to avoid
// occasional failures in precheckins and cruisecontrol.
public static void assertAllQueuesEmpty() {
Iterator servers = cacheServer.getCacheServers().iterator();
assertTrue("No servers found!", servers.hasNext());
while (servers.hasNext()) {
Iterator proxies = ((CacheServerImpl) servers.next()).getAcceptor().getCacheClientNotifier()
.getClientProxies().iterator();
assertTrue("No proxies found!", proxies.hasNext());
while (proxies.hasNext()) {
int qsize = ((CacheClientProxy) proxies.next()).getQueueSize();
assertTrue("Queue size expected to be zero but is " + qsize, qsize == 0);
}
}
}
public static void waitForQueuesToDrain() {
WaitCriterion wc = new WaitCriterion() {
String excuse;
@Override
public boolean done() {
// assume a single cache server as configured in this test
CacheServerImpl bridgeServer =
(CacheServerImpl) cacheServer.getCacheServers().iterator().next();
if (bridgeServer == null) {
excuse = "No Cache Server";
return false;
}
Iterator proxies =
bridgeServer.getAcceptor().getCacheClientNotifier().getClientProxies().iterator();
if (!proxies.hasNext()) {
excuse = "No CacheClientProxy";
return false;
}
while (proxies.hasNext()) {
CacheClientProxy proxy = (CacheClientProxy) proxies.next();
if (proxy == null) {
excuse = "No CacheClientProxy";
return false;
}
// Verify the queue size
int sz = proxy.getQueueSize();
if (0 != sz) {
excuse = "Queue did not drain. Expected size = 0, actual = " + sz + "for " + proxy;
return false;
}
}
return true;
}
@Override
public String description() {
return excuse;
}
};
GeodeAwaitility.await().untilAsserted(wc);
}
/**
* create a server cache and start the server
*
*/
public static Integer createServerCache() throws Exception {
ClientInterestNotifyDUnitTest test = new ClientInterestNotifyDUnitTest();
Properties props = new Properties();
props.setProperty(DELTA_PROPAGATION, "false");
cacheServer = test.createCache(props);
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.REPLICATE);
factory.setConcurrencyChecksEnabled(false);
RegionAttributes attrs = factory.create();
cacheServer.createRegion(REGION_NAME1, attrs);
cacheServer.createRegion(REGION_NAME2, attrs);
cacheServer.createRegion(REGION_NAME3, attrs);
CacheServer server = cacheServer.addCacheServer();
int port = getRandomAvailableTCPPort();
server.setPort(port);
server.setNotifyBySubscription(true);
server.setSocketBufferSize(32768);
server.start();
return new Integer(server.getPort());
}
/**
* close the client cache
*
*/
public static void closeCache() {
Cache cacheClient = GemFireCacheImpl.getInstance();
if (cacheClient != null && !cacheClient.isClosed()) {
cacheClient.close();
cacheClient.getDistributedSystem().disconnect();
}
}
/**
* close the server cache
*
*/
public static void closeCacheServer() {
if (cacheServer != null && !cacheServer.isClosed()) {
cacheServer.close();
cacheServer.getDistributedSystem().disconnect();
}
}
/**
* register interest with the server on ALL_KEYS
*
*/
public static void registerInterest() {
try {
Cache cacheClient = GemFireCacheImpl.getInstance();
Region region1 = cacheClient.getRegion(SEPARATOR + REGION_NAME1);
Region region2 = cacheClient.getRegion(SEPARATOR + REGION_NAME2);
// We intentionally do not register interest in region 3 to check no events recvd.
Region region3 = cacheClient.getRegion(SEPARATOR + REGION_NAME3);
assertTrue(region1 != null);
assertTrue(region2 != null);
assertTrue(region3 != null);
region1.registerInterest("ALL_KEYS", InterestResultPolicy.KEYS_VALUES, false, true);
region2.registerInterest("ALL_KEYS", InterestResultPolicy.KEYS_VALUES, false, false);
// We intentionally do not register interest in region 3 to check no events recvd.
// region3.registerInterestBlah();
} catch (CacheWriterException e) {
fail("test failed due to " + e);
}
}
/**
* register interest with the server on ALL_KEYS
*
*/
public static void unregisterInterest() {
try {
Cache cacheClient = GemFireCacheImpl.getInstance();
Region region1 = cacheClient.getRegion(SEPARATOR + REGION_NAME1);
Region region2 = cacheClient.getRegion(SEPARATOR + REGION_NAME2);
region1.unregisterInterest("ALL_KEYS");
region2.unregisterInterest("ALL_KEYS");
} catch (CacheWriterException e) {
fail("test failed due to " + e);
}
}
/**
* Do 2 puts, 1 invalidate and 1 destroy for a key "key-1"
*/
public static void doEntryOps() {
try {
LogWriterUtils.getLogWriter().info("Putting entries...");
Cache cacheClient = GemFireCacheImpl.getInstance();
Region r1 = cacheClient.getRegion(SEPARATOR + REGION_NAME1);
Region r2 = cacheClient.getRegion(SEPARATOR + REGION_NAME2);
Region r3 = cacheClient.getRegion(SEPARATOR + REGION_NAME3);
r1.put("key-1", "11");
r2.put("key-1", "11");
r3.put("key-1", "11");
r1.put("key-1", "22");
r2.put("key-1", "22");
r3.put("key-1", "22");
r1.invalidate("key-1");
r2.invalidate("key-1");
r3.invalidate("key-1");
r1.destroy("key-1");
r2.destroy("key-1");
r3.destroy("key-1");
} catch (Exception ex) {
ex.printStackTrace();
Assert.fail("failed while region doing ops", ex);
}
}
/**
* Do initial puts
*/
public static void doFeed() {
try {
LogWriterUtils.getLogWriter().info("Putting entries...");
Cache cacheClient = GemFireCacheImpl.getInstance();
Region r1 = cacheClient.getRegion(SEPARATOR + REGION_NAME1);
Region r2 = cacheClient.getRegion(SEPARATOR + REGION_NAME2);
Region r3 = cacheClient.getRegion(SEPARATOR + REGION_NAME3);
r1.put("key-1", "00");
r2.put("key-1", "00");
r3.put("key-1", "00");
} catch (Exception ex) {
ex.printStackTrace();
Assert.fail("failed while region doing ops", ex);
}
}
/**
* Get entries on all clients' region 3 since it does not register interest.
*/
public static void getEntries() {
try {
LogWriterUtils.getLogWriter().info("Getting entries...");
Cache cacheClient = GemFireCacheImpl.getInstance();
Region r3 = cacheClient.getRegion(SEPARATOR + REGION_NAME3);
r3.get("key-1");
} catch (Exception ex) {
ex.printStackTrace();
Assert.fail("failed while region doing ops", ex);
}
}
/**
* close the caches in tearDown
*/
@Override
public final void preTearDown() throws Exception {
vm0.invoke(() -> ClientInterestNotifyDUnitTest.closeCache());
vm1.invoke(() -> ClientInterestNotifyDUnitTest.closeCache());
closeCacheServer();
}
}