blob: 2e78fa32645b741aa1d3c43a7016dcb0c0e84187 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets;
import static java.lang.Thread.yield;
import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.distributed.ConfigurationProperties.CONFLATE_EVENTS;
import static org.apache.geode.distributed.ConfigurationProperties.DELTA_PROPAGATION;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.internal.DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_OFF;
import static org.apache.geode.distributed.internal.DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_ON;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.Iterator;
import java.util.Properties;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.AvailablePort;
import org.apache.geode.internal.cache.CacheServerImpl;
import org.apache.geode.internal.cache.ClientServerObserverAdapter;
import org.apache.geode.internal.cache.ClientServerObserverHolder;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
/**
* This test verifies the per-client queue conflation override functionality Taken from the existing
* ConflationDUnitTest.java and modified.
*
* @since GemFire 5.7
*/
@Category({ClientSubscriptionTest.class})
public class ClientConflationDUnitTest extends JUnit4DistributedTestCase {
VM vm0 = null; // server
VM vm1 = null; // client
private static Cache cacheClient = null;
private static Cache cacheFeeder = null;
private static Cache cacheServer = null;
private int PORT;
private static int poolNameCounter = 0;
private static final String REGION_NAME1 = "ClientConflationDUnitTest_region1";
private static final String REGION_NAME2 = "ClientConflationDUnitTest_region2";
@Override
public final void postSetUp() throws Exception {
disconnectAllFromDS();
final Host host = Host.getHost(0);
vm0 = host.getVM(0);
vm1 = host.getVM(1);
setIsSlowStart();
vm0.invoke(() -> ClientConflationDUnitTest.setIsSlowStart());
PORT = ((Integer) vm0.invoke(() -> ClientConflationDUnitTest.createServerCache())).intValue();
}
private Cache createCache(Properties props) throws Exception {
DistributedSystem ds = getSystem(props);
Cache cache = CacheFactory.create(ds);
if (cache == null) {
throw new Exception("CacheFactory.create() returned null ");
}
return cache;
}
/**
* set the boolean for starting the dispatcher thread a bit later.
*
*/
public static void setIsSlowStart() {
CacheClientProxy.isSlowStartForTesting = true;
System.setProperty("slowStartTimeForTesting", "15000");
}
@Test
public void testConflationDefault() {
try {
performSteps(DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_DEFAULT);
} catch (Exception e) {
Assert.fail("testConflationDefault failed due to exception", e);
}
}
@Test
public void testConflationOn() {
try {
performSteps(DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_ON);
} catch (Exception e) {
Assert.fail("testConflationOn failed due to exception", e);
}
}
@Test
public void testConflationOff() {
try {
performSteps(DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_OFF);
} catch (Exception e) {
Assert.fail("testConflationOff failed due to exception", e);
}
}
private void performSteps(String conflation) throws Exception {
createClientCacheFeeder(NetworkUtils.getServerHostName(Host.getHost(0)), new Integer(PORT));
vm1.invoke(() -> ClientConflationDUnitTest.createClientCache(
NetworkUtils.getServerHostName(vm1.getHost()), new Integer(PORT), conflation));
vm1.invoke(() -> ClientConflationDUnitTest.setClientServerObserverForBeforeInterestRecovery());
vm1.invoke(() -> ClientConflationDUnitTest.setAllCountersZero());
vm1.invoke(() -> ClientConflationDUnitTest.assertAllCountersZero());
vm1.invoke(() -> ClientConflationDUnitTest.registerInterest());
putEntries();
vm0.invoke(() -> ConflationDUnitTestHelper.unsetIsSlowStart());
Thread.sleep(20000);
vm0.invoke(() -> ClientConflationDUnitTest.assertAllQueuesEmpty());
vm1.invoke(() -> ClientConflationDUnitTest.assertCounterSizes(conflation));
vm1.invoke(() -> ClientConflationDUnitTest.assertValue());
}
/**
* create properties for a loner VM
*/
private static Properties createProperties1(String conflation) {
Properties props = new Properties();
props.setProperty(DELTA_PROPAGATION, "false");
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "");
props.setProperty(CONFLATE_EVENTS, conflation);
return props;
}
private static void createPool2(String host, AttributesFactory factory, Integer port) {
PoolFactory pf = PoolManager.createFactory();
pf.addServer(host, port.intValue()).setSubscriptionEnabled(true).setReadTimeout(10000)
.setSocketBufferSize(32768).setPingInterval(1000).setMinConnections(3)
.setSubscriptionRedundancy(-1);
Pool pool = pf.create("superpoolish" + (poolNameCounter++));
factory.setPoolName(pool.getName());
}
/**
* create client 2 with 2 regions each with a unique writer and unique listeners
*
*/
public static void createClientCache(String host, Integer port, String conflation)
throws Exception {
ClientConflationDUnitTest test = new ClientConflationDUnitTest();
cacheClient = test.createCache(createProperties1(conflation));
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
createPool2(host, factory, port);
factory.setCacheListener(new CacheListenerAdapter() {
@Override
public void afterCreate(EntryEvent event) {
synchronized (ClientConflationDUnitTest.class) {
counterCreate1++;
}
}
@Override
public void afterUpdate(EntryEvent event) {
// getLogWriter().info("afterUpdate event = " + event, new Exception());
synchronized (this) {
counterUpdate1++;
}
}
});
RegionAttributes attrs = factory.create();
cacheClient.createRegion(REGION_NAME1, attrs);
createPool2(host, factory, port);
factory.setCacheListener(new CacheListenerAdapter() {
@Override
public void afterCreate(EntryEvent event) {
synchronized (ClientConflationDUnitTest.class) {
counterCreate2++;
}
}
@Override
public void afterUpdate(EntryEvent event) {
synchronized (this) {
counterUpdate2++;
}
}
});
attrs = factory.create();
cacheClient.createRegion(REGION_NAME2, attrs);
}
public static void createClientCacheFeeder(String host, Integer port) throws Exception {
ClientConflationDUnitTest test = new ClientConflationDUnitTest();
cacheFeeder = test
.createCache(createProperties1(DistributionConfig.CLIENT_CONFLATION_PROP_VALUE_DEFAULT));
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
createPool2(host, factory, port);
RegionAttributes attrs = factory.create();
cacheFeeder.createRegion(REGION_NAME1, attrs);
attrs = factory.create();
cacheFeeder.createRegion(REGION_NAME2, attrs);
}
/**
* variables to count operations (messages received on client from server)
*/
// For first region with server conflation setting on
static volatile int counterCreate1 = 0;
static volatile int counterUpdate1 = 0;
// For first region with server conflation setting off
static volatile int counterCreate2 = 0;
static volatile int counterUpdate2 = 0;
/**
* assert all the counters are zero
*
*/
public static void assertAllCountersZero() {
assertEquals(counterCreate1, 0);
assertEquals(counterUpdate1, 0);
assertEquals(counterCreate2, 0);
assertEquals(counterUpdate2, 0);
}
/**
* set all the counters to zero
*
*/
public static void setAllCountersZero() {
counterCreate1 = 0;
counterUpdate1 = 0;
counterCreate2 = 0;
counterUpdate2 = 0;
}
/**
* reset all counters to zero before interest recovery
*
*/
public static void setClientServerObserverForBeforeInterestRecovery() {
PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = true;
ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() {
@Override
public void beforeInterestRecovery() {
setAllCountersZero();
}
});
}
/**
* Assert all queues are empty to aid later assertion for listener event counts.
*/
public static void assertAllQueuesEmpty() {
Iterator servers = cacheServer.getCacheServers().iterator();
while (servers.hasNext()) {
Iterator proxies = ((CacheServerImpl) servers.next()).getAcceptor().getCacheClientNotifier()
.getClientProxies().iterator();
while (proxies.hasNext()) {
int qsize = ((CacheClientProxy) proxies.next()).getQueueSize();
assertTrue("Queue size expected to be zero but is " + qsize, qsize == 0);
}
}
}
/**
* assert the listener counters size are as expected
*
*/
public static void assertCounterSizes(String conflation) {
// we do 5 puts on each key, so:
// for writer 1 default conflation is on
final int create1 = 1;
int update1 = 1;
// for writer 2 default conflation is off
final int create2 = 1;
int update2 = 4;
if (conflation.equals(CLIENT_CONFLATION_PROP_VALUE_ON)) {
// override
update2 = 1;
} else if (conflation.equals(CLIENT_CONFLATION_PROP_VALUE_OFF)) {
// override
update1 = 4;
}
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
yield(); // TODO is this necessary?
return counterCreate1 == create1;
}
@Override
public String description() {
return null;
}
};
GeodeAwaitility.await().untilAsserted(ev);
final int u1 = update1;
ev = new WaitCriterion() {
@Override
public boolean done() {
yield(); // TODO is this necessary?
return counterUpdate1 == u1;
}
@Override
public String description() {
return null;
}
};
GeodeAwaitility.await().untilAsserted(ev);
ev = new WaitCriterion() {
@Override
public boolean done() {
yield(); // TODO is this necessary?
return counterCreate2 == create2;
}
@Override
public String description() {
return null;
}
};
GeodeAwaitility.await().untilAsserted(ev);
final int u2 = update2;
ev = new WaitCriterion() {
@Override
public boolean done() {
yield(); // TODO is this necessary?
return counterUpdate2 == u2;
}
@Override
public String description() {
return null;
}
};
GeodeAwaitility.await().untilAsserted(ev);
}
/**
* assert that the final value is 55
*
*/
public static void assertValue() {
try {
Region r1 = cacheClient.getRegion(SEPARATOR + REGION_NAME1);
Region r2 = cacheClient.getRegion(SEPARATOR + REGION_NAME2);
assertTrue(r1.containsKey("key-1"));
assertTrue(r1.get("key-1").equals("55"));
assertTrue(r2.containsKey("key-1"));
assertTrue(r2.get("key-1").equals("55"));
} catch (Exception e) {
fail("Exception in trying to get due to " + e);
}
}
/**
* create a server cache and start the server
*
*/
public static Integer createServerCache() throws Exception {
Properties props = new Properties();
props.setProperty(DELTA_PROPAGATION, "false");
ClientConflationDUnitTest test = new ClientConflationDUnitTest();
cacheServer = test.createCache(props);
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setEnableConflation(true);
RegionAttributes attrs1 = factory.create();
factory.setEnableConflation(false);
RegionAttributes attrs2 = factory.create();
cacheServer.createRegion(REGION_NAME1, attrs1);
cacheServer.createRegion(REGION_NAME2, attrs2);
CacheServer server = cacheServer.addCacheServer();
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
server.setPort(port);
server.setNotifyBySubscription(true);
server.setSocketBufferSize(32768);
server.start();
return new Integer(server.getPort());
}
/**
* close the client cache
*
*/
public static void closeCacheClient() {
if (cacheClient != null && !cacheClient.isClosed()) {
cacheClient.close();
cacheClient.getDistributedSystem().disconnect();
}
}
/**
* close the feeder cache
*
*/
public static void closeCacheFeeder() {
if (cacheFeeder != null && !cacheFeeder.isClosed()) {
cacheFeeder.close();
cacheFeeder.getDistributedSystem().disconnect();
}
}
/**
* close the server cache
*
*/
public static void closeCacheServer() {
if (cacheServer != null && !cacheServer.isClosed()) {
cacheServer.close();
cacheServer.getDistributedSystem().disconnect();
}
}
/**
* register interest with the server on ALL_KEYS
*
*/
public static void registerInterest() {
try {
Region region1 = cacheClient.getRegion(SEPARATOR + REGION_NAME1);
Region region2 = cacheClient.getRegion(SEPARATOR + REGION_NAME2);
assertTrue(region1 != null);
assertTrue(region2 != null);
region1.registerInterest("ALL_KEYS");
region2.registerInterest("ALL_KEYS");
} catch (CacheWriterException e) {
fail("test failed due to " + e);
}
}
/**
* register interest with the server on ALL_KEYS
*
*/
public static void unregisterInterest() {
try {
Region region1 = cacheClient.getRegion(SEPARATOR + REGION_NAME1);
Region region2 = cacheClient.getRegion(SEPARATOR + REGION_NAME2);
region1.unregisterInterest("ALL_KEYS");
region2.unregisterInterest("ALL_KEYS");
} catch (CacheWriterException e) {
fail("test failed due to " + e);
}
}
/**
* do 5 puts on key-1
*
*/
public static void putEntries() {
try {
LogWriterUtils.getLogWriter().info("Putting entries...");
Region r1 = cacheFeeder.getRegion(SEPARATOR + REGION_NAME1);
Region r2 = cacheFeeder.getRegion(SEPARATOR + REGION_NAME2);
r1.put("key-1", "11");
r2.put("key-1", "11");
r1.put("key-1", "22");
r2.put("key-1", "22");
r1.put("key-1", "33");
r2.put("key-1", "33");
r1.put("key-1", "44");
r2.put("key-1", "44");
r1.put("key-1", "55");
r2.put("key-1", "55");
} catch (Exception ex) {
ex.printStackTrace();
Assert.fail("failed while region.put()", ex);
}
}
/**
* close the cache in tearDown
*/
@Override
public final void preTearDown() throws Exception {
// close client
closeCacheFeeder();
vm1.invoke(() -> ClientConflationDUnitTest.closeCacheClient());
// close server
vm0.invoke(() -> ClientConflationDUnitTest.closeCacheServer());
}
}