blob: 16360878e616907216e2460a9125358ce92cfd97 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.cache30.ClientServerTestCase.TEST_POOL_NAME;
import static org.apache.geode.cache30.ClientServerTestCase.configureConnectionPool;
import static org.apache.geode.cache30.ClientServerTestCase.configureConnectionPoolWithNameAndFactory;
import static org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier.getInstance;
import static org.apache.geode.logging.internal.spi.LogWriterLevel.ALL;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
import static org.assertj.core.api.Assertions.fail;
import static org.junit.runners.MethodSorters.NAME_ASCENDING;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.NotSerializableException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.junit.After;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.DataSerializable;
import org.apache.geode.LogWriter;
import org.apache.geode.cache.client.NoAvailableServersException;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.cache30.CertifiableTestCacheListener;
import org.apache.geode.cache30.TestCacheWriter;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.EntryExpiryTask;
import org.apache.geode.internal.cache.InternalCacheServer;
import org.apache.geode.internal.cache.PoolStats;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifierStats;
import org.apache.geode.internal.logging.LocalLogWriter;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.dunit.rules.DistributedRule;
import org.apache.geode.test.junit.categories.ClientServerTest;
/**
* This class tests the client connection pool in GemFire. It does so by creating a cache server
* with a cache and a pre-defined region and a data loader. The client creates the same region with
* a pool (this happens in the controller VM). the client then spins up 10 different threads and
* issues gets on keys. The server data loader returns the data to the client.
* <p>
* Test uses Groboutils TestRunnable objects to achieve multi threading behavior in the test.
*/
@Category({ClientServerTest.class})
@FixMethodOrder(NAME_ASCENDING)
public class ConnectionPoolDUnitTest extends JUnit4CacheTestCase {
/**
* The port on which the cache server was started in this VM
*/
private static int bridgeServerPort;
private static int numberOfAfterInvalidates;
private static int numberOfAfterCreates;
private static int numberOfAfterUpdates;
private static final int TYPE_CREATE = 0;
private static final int TYPE_UPDATE = 1;
private static final int TYPE_INVALIDATE = 2;
private static final int TYPE_DESTROY = 3;
@Rule
public DistributedRule distributedRule = new DistributedRule();
private VM vm0;
private VM vm1;
private VM vm2;
private VM vm3;
@Before
public void setup() {
vm0 = VM.getVM(0);
vm1 = VM.getVM(1);
vm2 = VM.getVM(2);
vm3 = VM.getVM(3);
}
@After
public void tearDown() {
Invoke.invokeInEveryVM(() -> {
if (basicGetCache() != null) {
basicGetCache().close();
}
PoolManager.getAll().forEach((key, value) -> value.destroy());
});
}
private static PoolImpl getPool(Region r) {
PoolImpl result = null;
String poolName = r.getAttributes().getPoolName();
if (poolName != null) {
result = (PoolImpl) PoolManager.find(poolName);
}
return result;
}
private static TestCacheWriter getTestWriter(Region r) {
return (TestCacheWriter) r.getAttributes().getCacheWriter();
}
private void startBridgeServer(int port)
throws IOException {
Cache cache = getCache();
CacheServer bridge = cache.addCacheServer();
bridge.setPort(port);
bridge.setMaxThreads(0);
bridge.setLoadPollInterval(CacheServer.DEFAULT_LOAD_POLL_INTERVAL);
bridge.start();
bridgeServerPort = bridge.getPort();
}
/**
* Stops the cache server that serves up the given cache.
*
* @since GemFire 4.0
*/
private void stopBridgeServer(Cache cache) {
CacheServer bridge = cache.getCacheServers().iterator().next();
bridge.stop();
assertThat(bridge.isRunning()).isFalse();
}
private void createLonerDS() {
disconnectFromDS();
InternalDistributedSystem ds = getLonerSystem();
assertThat(ds.getDistributionManager().getOtherDistributionManagerIds()).isEmpty();
}
/**
* Returns region attributes for a <code>LOCAL</code> region
*/
protected <K, V> RegionAttributes<K, V> getRegionAttributes() {
AttributesFactory<K, V> factory = new AttributesFactory<>();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false); // test validation expects this behavior
return factory.create();
}
private static class EventWrapper {
final EntryEvent event;
final Object key;
final Object val;
final Object arg;
final int type;
EventWrapper(EntryEvent ee, int type) {
this.event = ee;
this.key = ee.getKey();
this.val = ee.getNewValue();
this.arg = ee.getCallbackArgument();
this.type = type;
}
public String toString() {
return "EventWrapper: event=" + event + ", type=" + type;
}
}
static class ControlListener extends CacheListenerAdapter<Object, Object> {
final LinkedList<EventWrapper> events = new LinkedList<>();
final Object CONTROL_LOCK = new Object();
void waitWhileNotEnoughEvents(int eventCount) {
long maxMillis = System.currentTimeMillis() + (long) 60000;
synchronized (this.CONTROL_LOCK) {
try {
while (this.events.size() < eventCount) {
long waitMillis = maxMillis - System.currentTimeMillis();
if (waitMillis < 10) {
break;
}
this.CONTROL_LOCK.wait(waitMillis);
}
} catch (InterruptedException abort) {
fail("interrupted");
}
}
}
@Override
public void afterCreate(EntryEvent e) {
synchronized (this.CONTROL_LOCK) {
this.events.add(new EventWrapper(e, TYPE_CREATE));
this.CONTROL_LOCK.notifyAll();
}
}
@Override
public void afterUpdate(EntryEvent e) {
synchronized (this.CONTROL_LOCK) {
this.events.add(new EventWrapper(e, TYPE_UPDATE));
this.CONTROL_LOCK.notifyAll();
}
}
@Override
public void afterInvalidate(EntryEvent e) {
synchronized (this.CONTROL_LOCK) {
this.events.add(new EventWrapper(e, TYPE_INVALIDATE));
this.CONTROL_LOCK.notifyAll();
}
}
@Override
public void afterDestroy(EntryEvent e) {
synchronized (this.CONTROL_LOCK) {
this.events.add(new EventWrapper(e, TYPE_DESTROY));
this.CONTROL_LOCK.notifyAll();
}
}
}
private void verifyServerCount(final PoolImpl pool, final int expectedCount) {
getCache().getLogger().info("verifyServerCount expects=" + expectedCount);
await().alias("Expecting found server count to match expected count")
.until(() -> pool.getConnectedServerCount() == expectedCount);
}
/**
* Tests that the callback argument is sent to the server
*/
@Test
public void test001CallbackArg() throws CacheException {
final String name = this.getName();
final Object createCallbackArg = "CREATE CALLBACK ARG";
final Object updateCallbackArg = "PUT CALLBACK ARG";
vm0.invoke("Create Cache Server", () -> {
CacheWriter<Object, Object> cw = new TestCacheWriter<Object, Object>() {
@Override
public final void beforeUpdate2(EntryEvent event) throws CacheWriterException {
Object beca = event.getCallbackArgument();
assertThat(updateCallbackArg).isEqualTo(beca);
}
@Override
public void beforeCreate2(EntryEvent event) throws CacheWriterException {
Object beca = event.getCallbackArgument();
assertThat(createCallbackArg).isEqualTo(beca);
}
};
RegionFactory<Object, Object> factory = getBridgeServerRegionAttributes(null, cw);
createRegion(name, factory);
startBridgeServer(0);
});
final int port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
vm1.invoke("Create region", () -> {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, NetworkUtils.getServerHostName(),
new int[] {port}, true, -1, -1, null);
createRegion(name, factory);
});
vm1.invoke("Add entries", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.create(i, "old" + i, createCallbackArg);
}
for (int i = 0; i < 10; i++) {
region.put(i, "new" + i, updateCallbackArg);
}
});
vm0.invoke("Check cache writer", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
TestCacheWriter writer = getTestWriter(region);
assertThat(writer.wasInvoked()).isTrue();
});
vm1.invoke("Close Pool", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
});
}
/**
* Tests that consecutive puts have the callback assigned appropriately.
*/
@Test
public void test002CallbackArg2() throws CacheException {
final String name = this.getName();
final Object createCallbackArg = "CREATE CALLBACK ARG";
vm0.invoke("Create Cache Server", () -> {
CacheWriter<Object, Object> cacheWriter = new TestCacheWriter<Object, Object>() {
@Override
public void beforeCreate2(EntryEvent event) throws CacheWriterException {
Integer key = (Integer) event.getKey();
if (key % 2 == 0) {
Object callbackArgument = event.getCallbackArgument();
assertThat(createCallbackArg).isEqualTo(callbackArgument);
} else {
Object callbackArgument = event.getCallbackArgument();
assertThat(callbackArgument).isNull();
}
}
};
RegionFactory<Object, Object> factory = getBridgeServerRegionAttributes(null, cacheWriter);
createRegion(name, factory);
startBridgeServer(0);
});
final int port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
vm1.invoke("Create region", () -> {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
createRegion(name, factory);
});
vm1.invoke("Add entries", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
region.create(i, "old" + i, createCallbackArg);
} else {
region.create(i, "old" + i);
}
}
});
vm1.invoke("Close Pool", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
});
vm0.invoke("Check cache writer", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
TestCacheWriter writer = getTestWriter(region);
assertThat(writer.wasInvoked()).isTrue();
});
}
/**
* Tests for bug 36684 by having two cache servers with cacheloaders that should always return a
* value and one client connected to each server reading values. If the bug exists, the clients
* will get null sometimes.
*/
@Test
public void test003Bug36684() throws CacheException, InterruptedException {
final String name = this.getName();
// Create the cache servers with distributed, mirrored region
Stream.of(vm0, vm1).forEach(vm -> {
vm.invoke("Create Cache Server", () -> {
CacheLoader<Object, Object> cl = new CacheLoader<Object, Object>() {
@Override
public Object load(LoaderHelper helper) {
return helper.getKey();
}
@Override
public void close() {
}
};
RegionFactory<Object, Object> factory =
getBridgeServerMirroredAckRegionAttributes(cl);
createRegion(name, factory);
startBridgeServer(0);
});
logger.info("before create server");
});
// Create cache server clients
final int numberOfKeys = 1000;
final String host0 = NetworkUtils.getServerHostName();
final int vm0Port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final int vm1Port = vm1.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
SerializableRunnable createClient = new CacheSerializableRunnable() {
@Override
public void run2() throws CacheException {
// reset all static listener variables in case this is being rerun in a subclass
numberOfAfterInvalidates = 0;
numberOfAfterCreates = 0;
numberOfAfterUpdates = 0;
// create the region
getLonerSystem();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false); // test validation expects this behavior
// create bridge writer
configureConnectionPool(factory, host0, new int[] {vm0Port, vm1Port}, true, -1,
-1, null);
createRegion(name, factory);
}
};
logger.info("before create client");
vm2.invoke("Create Cache Server Client", createClient);
vm3.invoke("Create Cache Server Client", createClient);
// Initialize each client with entries (so that afterInvalidate is called)
SerializableRunnable initializeClient = new CacheSerializableRunnable() {
@Override
public void run2() throws CacheException {
// StringBuffer errors = new StringBuffer();
numberOfAfterInvalidates = 0;
numberOfAfterCreates = 0;
numberOfAfterUpdates = 0;
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < numberOfKeys; i++) {
String expected = "key-" + i;
String actual = (String) region.get("key-" + i);
assertThat(expected).isEqualTo(actual);
}
}
};
logger.info("before initialize client");
AsyncInvocation inv2 = vm2.invokeAsync("Initialize Client", initializeClient);
AsyncInvocation inv3 = vm3.invokeAsync("Initialize Client", initializeClient);
inv2.await();
inv3.await();
}
/**
* Test for client connection loss with CacheLoader Exception on the server.
*/
@Test
public void test004ForCacheLoaderException() throws CacheException {
final String name = this.getName();
VM server = VM.getVM(0);
VM client = VM.getVM(1);
// Create the cache servers with distributed, mirrored region
logger.info("before create server");
server.invoke("Create Cache Server", () -> {
CacheLoader<Object, Object> cl = new CacheLoader<Object, Object>() {
@Override
public Object load(LoaderHelper helper) {
System.out.println("### CALLING CACHE LOADER....");
throw new CacheLoaderException(
"Test for CahceLoaderException causing Client connection to disconnect.");
}
@Override
public void close() {}
};
RegionFactory<Object, Object> factory =
getBridgeServerMirroredAckRegionAttributes(cl);
createRegion(name, factory);
startBridgeServer(0);
});
// Create cache server clients
final int numberOfKeys = 10;
final String host0 = NetworkUtils.getServerHostName();
final int[] port =
new int[] {server.invoke(ConnectionPoolDUnitTest::getCacheServerPort)};
final String poolName = "myPool";
logger.info("before create client");
client.invoke("Create Cache Server Client", () -> {
getLonerSystem();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
configureConnectionPoolWithNameAndFactory(factory, host0, port, true,
-1, -1, null, poolName, PoolManager.createFactory(), -1, -1, -2, -1);
createRegion(name, factory);
});
// Initialize each client with entries (so that afterInvalidate is called)
logger.info("before initialize client");
AsyncInvocation inv2 = client.invokeAsync("Initialize Client", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
PoolStats stats = ((PoolImpl) PoolManager.find(poolName)).getStats();
int oldConnects = stats.getConnects();
int oldDisConnects = stats.getDisConnects();
for (int i = 0; i < numberOfKeys; i++) {
region.get("key-" + i);
}
int newConnects = stats.getConnects();
int newDisConnects = stats.getDisConnects();
// newDisConnects);
if (newConnects != oldConnects && newDisConnects != oldDisConnects) {
fail("New connection has created for Server side CacheLoaderException.");
}
});
ThreadUtils.join(inv2, 30 * 1000);
}
private void validateDS() {
List list = InternalDistributedSystem.getExistingSystems();
if (list.size() > 1) {
logger.info("validateDS: size=" + list.size() + " isDedicatedAdminVM="
+ ClusterDistributionManager.isDedicatedAdminVM() + " l=" + list);
}
assertThat(ClusterDistributionManager.isDedicatedAdminVM()).isFalse();
assertThat(1).isEqualTo(list.size());
}
/**
* Tests the basic operations of the {@link Pool}
*
* @since GemFire 3.5
*/
@Test
public void test006Pool() throws CacheException {
final String name = this.getName();
vm0.invoke("Create Cache Server", () -> {
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setConcurrencyChecksEnabled(false);
factory.setCacheLoader(new CacheLoader<Object, Object>() {
@Override
public Object load(LoaderHelper helper) {
return helper.getKey().toString();
}
@Override
public void close() {
}
});
createRegion(name, factory);
startBridgeServer(0);
});
final int port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
vm1.invoke("Create region", () -> {
getLonerSystem();
getCache();
validateDS();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
createRegion(name, factory);
});
vm1.invoke("Get values", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object value = region.get(i);
assertThat(String.valueOf(i)).isEqualTo(value);
}
});
vm1.invoke("Update values", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.put(i, i);
}
});
vm2.invoke("Create region", () -> {
getLonerSystem();
getCache();
validateDS();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
createRegion(name, factory);
});
vm2.invoke("Validate values", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object value = region.get(i);
assertThat(value).isNotNull();
assertThat(value).isInstanceOf(Integer.class);
assertThat(i).isEqualTo(((Integer) value).intValue());
}
});
vm1.invoke("Close Pool", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
String pName = region.getAttributes().getPoolName();
PoolImpl p = (PoolImpl) PoolManager.find(pName);
assertThat(p.isDestroyed()).isFalse();
assertThat(1).isEqualTo(p.getAttachCount());
try {
p.destroy();
fail("expected IllegalStateException");
} catch (IllegalStateException ignored) {
}
region.localDestroyRegion();
assertThat(p.isDestroyed()).isFalse();
assertThat(p.getAttachCount()).isEqualTo(0);
});
}
/**
* Tests the BridgeServer failover (bug 31832).
*/
@Test
public void test007BridgeServerFailoverCnx1() throws CacheException {
disconnectAllFromDS();
basicTestBridgeServerFailover(1);
}
/**
* Test BridgeServer failover with connectionsPerServer set to 0
*/
@Test
public void test008BridgeServerFailoverCnx0() throws CacheException {
basicTestBridgeServerFailover(0);
}
private void basicTestBridgeServerFailover(final int cnxCount) throws CacheException {
final String name = this.getName();
// Create two cache servers
Stream.of(vm0, vm1).forEach(vm -> vm.invoke("Create Cache Server", () -> {
RegionFactory<Object, Object> factory = getBridgeServerRegionAttributes(null, null);
createRegion(name, factory);
startBridgeServer(0);
}));
final int port0 = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
final int port1 = vm1.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
// Create one bridge client in this VM
vm2.invoke("Create region", () -> {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
String ServerGroup = null;
configureConnectionPoolWithNameAndFactory(factory, host0, new int[] {port0, port1}, true,
-1, cnxCount, ServerGroup, TEST_POOL_NAME, PoolManager.createFactory(), 100, -1, -2,
-1);
Region<Object, Object> region = createRegion(name, factory);
// force connections to form
region.put("keyInit", 0);
region.put("keyInit2", 0);
});
vm2.invoke("verify2Servers", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
PoolImpl pool = getPool(region);
verifyServerCount(pool, 2);
});
final String expected = "java.io.IOException";
final String addExpected =
"<ExpectedException action=add>" + expected + "</ExpectedException>";
final String removeExpected =
"<ExpectedException action=remove>" + expected + "</ExpectedException>";
vm2.invoke(() -> {
LogWriter bgexecLogger = new LocalLogWriter(ALL.intLevel(), System.out);
bgexecLogger.info(addExpected);
});
try { // make sure we removeExpected
// Bounce the non-current server (I know that VM1 contains the non-current server
// because ...
vm1.invoke(() -> stopBridgeServer(getCache()));
vm2.invoke("verify1Server", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
PoolImpl pool = getPool(region);
verifyServerCount(pool, 1);
});
vm1.invoke("Restart CacheServer", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
assertThat(region).isNotNull();
startBridgeServer(port1);
});
// Pause long enough for the monitor to realize the server has been bounced
// and reconnect to it.
vm2.invoke("verify2Servers", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
PoolImpl pool = getPool(region);
verifyServerCount(pool, 2);
});
} finally {
vm2.invoke(() -> {
LogWriter bgexecLogger = new LocalLogWriter(ALL.intLevel(), System.out);
bgexecLogger.info(removeExpected);
});
}
// Stop the other cache server
vm0.invoke(() -> stopBridgeServer(getCache()));
// Run awhile
vm2.invoke("verify1Server", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
PoolImpl pool = getPool(region);
verifyServerCount(pool, 1);
});
// Close Pool
vm2.invoke("Close Pool", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
});
}
private static volatile boolean stopTestLifetimeExpire = false;
private static volatile int baselineLifetimeCheck;
private static volatile int baselineLifetimeExtensions;
private static volatile int baselineLifetimeConnect;
private static volatile int baselineLifetimeDisconnect;
@Test
public void basicTestLifetimeExpire()
throws CacheException, InterruptedException {
final String name = this.getName();
AsyncInvocation putAI = null;
AsyncInvocation putAI2 = null;
try {
// Create two cache servers
vm0.invoke("Create Cache Server", () -> {
RegionFactory<Object, Object> factory = getBridgeServerRegionAttributes(null, null);
factory.addCacheListener(new DelayListener());
createRegion(name, factory);
startBridgeServer(0);
});
final int port0 = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
vm1.invoke("Create Cache Server", () -> {
RegionFactory<Object, Object> factory = getBridgeServerRegionAttributes(null, null);
factory.addCacheListener(new DelayListener());
createRegion(name, factory);
startBridgeServer(0);
});
final int port1 = vm1.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
// we only had to stop it to reserve a port
vm1.invoke("Stop CacheServer", () -> stopBridgeServer(getCache()));
// Create one bridge client in this VM
vm2.invoke("Create region", () -> {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPoolWithNameAndFactory(factory, host0, new int[] {port0, port1},
false, -1, 0, null, TEST_POOL_NAME, PoolManager.createFactory(), 100, 500, 500, -1);
Region<Object, Object> region = createRegion(name, factory);
// force connections to form
region.put("keyInit", 0);
region.put("keyInit2", 0);
});
// Launch async thread that puts objects into cache. This thread will execute until
// the test has ended.
putAI = vm2.invokeAsync("Put objects", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
PoolImpl pool = getPool(region);
PoolStats stats = pool.getStats();
baselineLifetimeCheck = stats.getLoadConditioningCheck();
baselineLifetimeExtensions = stats.getLoadConditioningExtensions();
baselineLifetimeConnect = stats.getLoadConditioningConnect();
baselineLifetimeDisconnect = stats.getLoadConditioningDisconnect();
try {
int count = 0;
while (!stopTestLifetimeExpire) {
count++;
region.put("keyAI1", count);
}
} catch (NoAvailableServersException ex) {
if (!stopTestLifetimeExpire) {
throw ex;
}
}
});
putAI2 = vm2.invokeAsync("Put objects", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
try {
int count = 0;
while (!stopTestLifetimeExpire) {
count++;
region.put("keyAI2", count);
}
} catch (NoAvailableServersException ex) {
if (!stopTestLifetimeExpire) {
throw ex;
}
}
});
vm2.invoke("verify1Server", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
PoolImpl pool = getPool(region);
final PoolStats stats = pool.getStats();
verifyServerCount(pool, 1);
await().until(() -> stats.getLoadConditioningCheck() >= (10 + baselineLifetimeCheck));
// make sure no replacements are happening.
// since we have 2 threads and 2 cnxs and 1 server
// when lifetimes are up we should only want to connect back to the
// server we are already connected to and thus just extend our lifetime
assertThat(stats.getLoadConditioningCheck() >= (10 + baselineLifetimeCheck))
.describedAs("baselineLifetimeCheck=" + baselineLifetimeCheck
+ " but stats.getLoadConditioningCheck()=" + stats.getLoadConditioningCheck())
.isTrue();
baselineLifetimeCheck = stats.getLoadConditioningCheck();
assertThat(stats.getLoadConditioningExtensions())
.isGreaterThan(baselineLifetimeExtensions);
assertThat(stats.getLoadConditioningConnect()).isEqualTo(baselineLifetimeConnect);
assertThat(stats.getLoadConditioningDisconnect()).isEqualTo(baselineLifetimeDisconnect);
});
await().until(putAI::isAlive);
await().until(putAI2::isAlive);
} finally {
vm2.invoke("Stop Putters", () -> stopTestLifetimeExpire = true);
try {
if (putAI != null) {
// Verify that no exception has occurred in the putter thread
putAI.await();
}
if (putAI2 != null) {
// Verify that no exception has occurred in the putter thread
putAI.await();
}
} finally {
vm2.invoke("Stop Putters", () -> stopTestLifetimeExpire = false);
// Close Pool
vm2.invoke("Close Pool", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
String poolName = region.getAttributes().getPoolName();
region.localDestroyRegion();
PoolManager.find(poolName).destroy();
});
}
}
}
/**
* Tests the create operation of the {@link Pool}
*
* @since GemFire 3.5
*/
@Test
public void test011PoolCreate() throws CacheException {
final String name = this.getName();
vm0.invoke("Create Cache Server", () -> {
RegionFactory<Object, Object> factory = getBridgeServerRegionAttributes(null, null);
createRegion(name, factory);
startBridgeServer(0);
});
final int port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
vm1.invoke("Create region", () -> {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, false, -1, -1, null);
createRegion(name, factory);
});
vm1.invoke("Create values", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.create(i, i);
}
});
vm2.invoke("Create region", () -> {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, false, -1, -1, null);
createRegion(name, factory);
});
vm2.invoke("Validate values", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object value = region.get(i);
assertThat(value).isNotNull();
assertThat(value).isInstanceOf(Integer.class);
assertThat(i).isEqualTo(((Integer) value).intValue());
}
});
SerializableRunnable close = new CacheSerializableRunnable() {
@Override
public void run2() throws CacheException {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
};
vm1.invoke("Close Pool", close);
vm2.invoke("Close Pool", close);
}
/**
* Tests the put operation of the {@link Pool}
*
* @since GemFire 3.5
*/
@Test
public void test012PoolPut() throws CacheException {
final String name = this.getName();
vm0 = VM.getVM(0);
vm1 = VM.getVM(1);
vm2 = VM.getVM(2);
vm0.invoke("Create Cache Server", () -> {
RegionFactory<Object, Object> factory = getBridgeServerRegionAttributes(null, null);
createRegion(name, factory);
startBridgeServer(0);
});
final int port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
SerializableRunnable createRegion = new CacheSerializableRunnable() {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
configureConnectionPool(factory, host0, new int[] {port}, false, -1, -1, null);
createRegion(name, factory);
}
};
vm1.invoke("Create region", createRegion);
vm1.invoke("Put values", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
// put string values
region.put("key-string-" + i, "value-" + i);
// put object values
Order order = new Order();
order.init(i);
region.put("key-object-" + i, order);
// put byte[] values
region.put("key-bytes-" + i, ("value-" + i).getBytes());
}
});
vm2.invoke("Create region", createRegion);
vm2.invoke("Get / validate string values", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object value = region.get("key-string-" + i);
assertThat(value).isNotNull();
assertThat(value).isInstanceOf(String.class);
assertThat("value-" + i).isEqualTo(value);
}
});
vm2.invoke("Get / validate object values", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object value = region.get("key-object-" + i);
assertThat(value).isNotNull();
assertThat(value).isInstanceOf(Order.class);
assertThat(i).isEqualTo(((Order) value).getIndex());
}
});
vm2.invoke("Get / validate byte[] values", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object value = region.get("key-bytes-" + i);
assertThat(value).isNotNull();
assertThat(value).isInstanceOf(byte[].class);
assertThat("value-" + i).isEqualTo(new String((byte[]) value));
}
});
SerializableRunnable closePool = new CacheSerializableRunnable() {
@Override
public void run2() throws CacheException {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
};
vm1.invoke(closePool);
vm2.invoke(closePool);
}
/**
* Tests the put operation of the {@link Pool}
*
* @since GemFire 3.5
*/
@Test
public void test013PoolPutNoDeserialize() throws CacheException {
final String name = this.getName();
vm0.invoke("Create Cache Server", () -> {
RegionFactory<Object, Object> factory = getBridgeServerRegionAttributes(null, null);
createRegion(name, factory);
startBridgeServer(0);
});
final int port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
SerializableRunnable createRegion = new CacheSerializableRunnable() {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, false, -1, -1, null);
createRegion(name, factory);
}
};
vm1.invoke("Create Region", createRegion);
vm1.invoke("Put values", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
// put string values
region.put("key-string-" + i, "value-" + i);
// put object values
Order order = new Order();
order.init(i);
region.put("key-object-" + i, order);
// put byte[] values
region.put("key-bytes-" + i, ("value-" + i).getBytes());
}
});
vm2.invoke("Create Region", createRegion);
vm2.invoke("Get / validate string values", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object value = region.get("key-string-" + i);
assertThat(value).isNotNull();
assertThat(value).isInstanceOf(String.class);
assertThat("value-" + i).isEqualTo(value);
}
});
vm2.invoke("Get / validate object values", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object value = region.get("key-object-" + i);
assertThat(value).isNotNull();
assertThat(value).isInstanceOf(Order.class);
assertThat(i).isEqualTo(((Order) value).getIndex());
}
});
vm2.invoke("Get / validate byte[] values", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object value = region.get("key-bytes-" + i);
assertThat(value).isNotNull();
assertThat(value instanceof byte[]).isTrue();
assertThat("value-" + i).isEqualTo(new String((byte[]) value));
}
});
SerializableRunnable closePool = new CacheSerializableRunnable() {
@Override
public void run2() throws CacheException {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
};
vm1.invoke("Close Pool", closePool);
vm2.invoke("Close Pool", closePool);
}
private <K, V> List<CacheEvent<K, V>> assertForOpCount(String regionName, Operation operation,
int count) {
Region<K, V> region = getRootRegion().getSubregion(regionName);
CertifiableTestCacheListener<K, V> ctl =
(CertifiableTestCacheListener<K, V>) region.getAttributes()
.getCacheListeners()[0];
List<CacheEvent<K, V>> list = new ArrayList<>(ctl.getEventHistory());
await().until(() -> {
list.addAll(ctl.getEventHistory());
return list.size() >= count;
});
int countOfOps = 0;
for (CacheEvent<K, V> cacheEvent : list) {
if (cacheEvent.getOperation() == operation) {
countOfOps++;
} else {
logger.info("assertForOpCount: Got an unexpected message " + cacheEvent);
}
}
assertThat(countOfOps).isEqualTo(count);
assertThat(countOfOps)
.describedAs("assertForOpCount: There were excess operations in the list " + list)
.isEqualTo(list.size());
return list;
}
/**
* Tests that invalidates and destroys are propagated to {@link Pool}s.
*
* @since GemFire 3.5
*/
@Test
public void test014InvalidateAndDestroyPropagation() throws CacheException {
final String name = this.getName();
vm0.invoke("Create Cache Server", () -> {
RegionFactory<Object, Object> factory = getBridgeServerRegionAttributes(null, null);
createRegion(name, factory);
startBridgeServer(0);
});
final int port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
vm1.invoke("Create region", new CacheSerializableRunnable() {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
factory.addCacheListener(new CertifiableTestCacheListener<>());
Region rgn = createRegion(name, factory);
rgn.registerInterestRegex(".*", false, false);
}
});
vm1.invoke("Populate region", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.put(i, "old" + i);
}
});
vm2.invoke("Create region", new CacheSerializableRunnable() {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
factory.addCacheListener(new CertifiableTestCacheListener<>());
Region rgn = createRegion(name, factory);
rgn.registerInterestRegex(".*", false, false);
}
});
vm1.invoke("Turn on history", () -> {
await().until(() -> getRootRegion().getSubregion(name) != null);
Region<Object, Object> region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener<Object, Object> ctl =
(CertifiableTestCacheListener<Object, Object>) region.getAttributes()
.getCacheListeners()[0];
ctl.enableEventHistory();
});
vm2.invoke("Update region", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.put(i, "new" + i, "callbackArg" + i);
}
});
vm1.invoke("Verify invalidates", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener<Object, Object> ctl =
(CertifiableTestCacheListener<Object, Object>) region.getAttributes()
.getCacheListeners()[0];
for (int i = 0; i < 10; i++) {
Object key = i;
ctl.waitForInvalidated(key);
Region.Entry entry = region.getEntry(key);
assertThat(entry).isNotNull();
assertThat(entry.getValue()).isNull();
}
List<CacheEvent<Object, Object>> list = assertForOpCount(name, Operation.INVALIDATE, 10);
for (int i = 0; i < 10; i++) {
Object key = i;
EntryEvent ee = (EntryEvent) list.get(i);
assertThat(ee.getKey()).isEqualTo(key);
assertThat("old" + i).isEqualTo(ee.getOldValue());
assertThat("callbackArg" + i).isEqualTo(ee.getCallbackArgument());
assertThat(ee.isOriginRemote()).isTrue();
}
});
vm2.invoke("Validate original and destroy", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object key = i;
assertThat(region.getEntry(key).getValue()).isEqualTo("new" + i);
region.destroy(key, "destroyCB" + i);
assertThat(region.getEntry(key)).isNull();
}
});
vm1.invoke("Verify destroys", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener<Object, Object> ctl =
(CertifiableTestCacheListener<Object, Object>) region.getAttributes()
.getCacheListeners()[0];
for (int i = 0; i < 10; i++) {
Object key = i;
ctl.waitForDestroyed(key);
Region.Entry entry = region.getEntry(key);
assertThat(entry).isNull();
}
List<CacheEvent<Object, Object>> list = assertForOpCount(name, Operation.DESTROY, 10);
for (int i = 0; i < 10; i++) {
Object key = i;
EntryEvent ee = (EntryEvent) list.get(i);
assertThat(ee.getKey()).isEqualTo(key);
assertThat(ee.getOldValue()).isNull();
assertThat("destroyCB" + i).isEqualTo(ee.getCallbackArgument());
assertThat(ee.isOriginRemote()).isTrue();
}
});
vm2.invoke("recreate", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object key = i;
region.create(key, "recreate" + i, "recreateCB" + i);
}
});
vm1.invoke("Verify Local Load Creates", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
await().untilAsserted(() -> {
for (int i = 0; i < 10; i++) {
Object key = i;
assertThat(region.containsKeyOnServer(key)).isTrue();
}
});
for (int i = 0; i < 10; i++) {
Object key = i;
region.get(key, "recreateCB");
}
List<CacheEvent<Object, Object>> list =
assertForOpCount(name, Operation.LOCAL_LOAD_CREATE, 10);
for (int i = 0; i < 10; i++) {
Object key = i;
EntryEvent ee = (EntryEvent) list.get(i);
assertThat(ee.getKey()).isEqualTo(key);
assertThat(ee.getOldValue()).isNull();
assertThat(ee.getNewValue()).isEqualTo("recreate" + i);
assertThat(ee.isOriginRemote()).isFalse();
}
});
vm1.invoke("Close Pool", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
});
vm2.invoke("Close Pool", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
});
}
/**
* Tests that invalidates and destroys are propagated to {@link Pool}s correctly to
* DataPolicy.EMPTY + InterestPolicy.ALL
*
* @since GemFire 5.0
*/
@Test
public void test015InvalidateAndDestroyToEmptyAllPropagation() throws CacheException {
final String name = this.getName();
vm0.invoke("Create Cache Server", () -> {
RegionFactory<Object, Object> factory = getBridgeServerRegionAttributes(null, null);
createRegion(name, factory);
startBridgeServer(0);
});
final int port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
vm1.invoke("Create region", () -> {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
factory.addCacheListener(new CertifiableTestCacheListener<>());
factory.setDataPolicy(DataPolicy.EMPTY);
factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
Region rgn = createRegion(name, factory);
rgn.registerInterestRegex(".*", false, false);
});
vm1.invoke("Populate region", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.put(i, "old" + i);
}
});
vm2.invoke("Create region", () -> {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
factory.addCacheListener(new CertifiableTestCacheListener<>());
Region rgn = createRegion(name, factory);
rgn.registerInterestRegex(".*", false, false);
});
vm1.invoke("Turn on history", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener<Object, Object> ctl =
(CertifiableTestCacheListener<Object, Object>) region.getAttributes()
.getCacheListeners()[0];
ctl.enableEventHistory();
});
vm2.invoke("Update region", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.put(i, "new" + i, "callbackArg" + i);
}
});
vm1.invoke("Verify invalidates", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener<Object, Object> ctl =
(CertifiableTestCacheListener<Object, Object>) region.getAttributes()
.getCacheListeners()[0];
for (int i = 0; i < 10; i++) {
Object key = i;
ctl.waitForInvalidated(key);
await().until(() -> region.getEntry(key) == null);
}
{
List<CacheEvent<Object, Object>> list = ctl.getEventHistory();
assertThat(list.size()).isEqualTo(10);
for (int i = 0; i < 10; i++) {
Object key = i;
EntryEvent ee = (EntryEvent) list.get(i);
assertThat(ee.getKey()).isEqualTo(key);
assertThat(ee.getOldValue()).isNull();
assertThat(ee.isOldValueAvailable()).isFalse(); // failure
assertThat(Operation.INVALIDATE).isEqualTo(ee.getOperation());
assertThat("callbackArg" + i).isEqualTo(ee.getCallbackArgument());
assertThat(ee.isOriginRemote()).isTrue();
}
}
});
vm2.invoke("Validate original and destroy", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object key = i;
assertThat("new" + i).isEqualTo(region.getEntry(key).getValue());
region.destroy(key, "destroyCB" + i);
}
});
vm1.invoke("Verify destroys", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener<Object, Object> ctl =
(CertifiableTestCacheListener<Object, Object>) region.getAttributes()
.getCacheListeners()[0];
for (int i = 0; i < 10; i++) {
Object key = i;
ctl.waitForDestroyed(key);
await().until(() -> region.getEntry(key) == null);
}
{
List<CacheEvent<Object, Object>> list = ctl.getEventHistory();
assertThat(10).isEqualTo(list.size());
for (int i = 0; i < 10; i++) {
Object key = i;
EntryEvent ee = (EntryEvent) list.get(i);
assertThat(ee.getKey()).isEqualTo(key);
assertThat(ee.getOldValue()).isNull();
assertThat(ee.isOldValueAvailable()).isFalse();
assertThat(Operation.DESTROY).isEqualTo(ee.getOperation());
assertThat("destroyCB" + i).isEqualTo(ee.getCallbackArgument());
assertThat(ee.isOriginRemote()).isTrue();
}
}
});
vm2.invoke("recreate", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object key = i;
region.create(key, "create" + i, "createCB" + i);
}
});
vm1.invoke("Verify creates", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener<Object, Object> ctl =
(CertifiableTestCacheListener<Object, Object>) region.getAttributes()
.getCacheListeners()[0];
for (int i = 0; i < 10; i++) {
Object key = i;
ctl.waitForInvalidated(key);
await().until(() -> region.getEntry(key) == null);
}
List<CacheEvent<Object, Object>> list = ctl.getEventHistory();
assertThat(10).isEqualTo(list.size());
for (int i = 0; i < 10; i++) {
Object key = i;
EntryEvent ee = (EntryEvent) list.get(i);
assertThat(ee.getKey()).isEqualTo(key);
assertThat(ee.getOldValue()).isNull();
assertThat(ee.isOldValueAvailable()).isFalse();
assertThat(Operation.INVALIDATE).isEqualTo(ee.getOperation());
assertThat("createCB" + i).isEqualTo(ee.getCallbackArgument());
assertThat(ee.isOriginRemote()).isTrue();
}
// now see if we can get it from the server
for (int i = 0; i < 10; i++) {
Object key = i;
assertThat("create" + i).isEqualTo(region.get(key, "loadCB" + i));
}
list = ctl.getEventHistory();
assertThat(10).isEqualTo(list.size());
for (int i = 0; i < 10; i++) {
Object key = i;
EntryEvent ee = (EntryEvent) list.get(i);
assertThat(ee.getKey()).isEqualTo(key);
assertThat(ee.getOldValue()).isNull();
assertThat(ee.getNewValue()).isEqualTo("create" + i);
assertThat(Operation.LOCAL_LOAD_CREATE).isEqualTo(ee.getOperation());
assertThat("loadCB" + i).isEqualTo(ee.getCallbackArgument());
assertThat(ee.isOriginRemote()).isFalse();
}
});
Stream.of(vm1, vm2).forEach(vm -> vm.invoke("Close Pool", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}));
}
/**
* Tests that invalidates and destroys are propagated to {@link Pool}s correctly to
* DataPolicy.EMPTY + InterestPolicy.CACHE_CONTENT
*
* @since GemFire 5.0
*/
@Test
public void test016InvalidateAndDestroyToEmptyCCPropagation() throws CacheException {
final String name = this.getName();
vm0.invoke("Create Cache Server", () -> {
RegionFactory<Object, Object> factory = getBridgeServerRegionAttributes(null, null);
createRegion(name, factory);
startBridgeServer(0);
});
final int port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
vm1.invoke("Create region", () -> {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
factory.addCacheListener(new CertifiableTestCacheListener<>());
factory.setDataPolicy(DataPolicy.EMPTY);
factory
.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.CACHE_CONTENT));
Region rgn = createRegion(name, factory);
rgn.registerInterestRegex(".*", false, false);
});
vm1.invoke("Populate region", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.put(i, "old" + i);
}
});
vm2.invoke("Create region", () -> {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
factory.addCacheListener(new CertifiableTestCacheListener<>());
Region rgn = createRegion(name, factory);
rgn.registerInterestRegex(".*", false, false);
});
vm1.invoke("Turn on history", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener<Object, Object> ctl =
(CertifiableTestCacheListener<Object, Object>) region.getAttributes()
.getCacheListeners()[0];
ctl.enableEventHistory();
});
vm2.invoke("Update region", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.put(i, "new" + i, "callbackArg" + i);
}
});
vm1.invoke("Verify invalidates", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener<Object, Object> ctl =
(CertifiableTestCacheListener<Object, Object>) region.getAttributes()
.getCacheListeners()[0];
await().until(() -> ctl.getEventHistory().size() == 0);
});
vm2.invoke("Validate original and destroy", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object key = i;
assertThat("new" + i).isEqualTo(region.getEntry(key).getValue());
region.destroy(key, "destroyCB" + i);
}
});
vm1.invoke("Verify destroys", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener<Object, Object> ctl =
(CertifiableTestCacheListener<Object, Object>) region.getAttributes()
.getCacheListeners()[0];
await().until(() -> ctl.getEventHistory().size() == 0);
});
vm2.invoke("recreate", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object key = i;
region.create(key, "create" + i, "createCB" + i);
}
});
vm1.invoke("Verify creates", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener<Object, Object> ctl =
(CertifiableTestCacheListener<Object, Object>) region.getAttributes()
.getCacheListeners()[0];
await().until(() -> ctl.getEventHistory().size() == 0);
List<CacheEvent<Object, Object>> list = ctl.getEventHistory();
assertThat(list).isEmpty();
// now see if we can get it from the server
for (int i = 0; i < 10; i++) {
Object key = i;
assertThat("create" + i).isEqualTo(region.get(key, "loadCB" + i));
}
list = ctl.getEventHistory();
assertThat(10).isEqualTo(list.size());
for (int i = 0; i < 10; i++) {
Object key = i;
EntryEvent ee = (EntryEvent) list.get(i);
assertThat(ee.getKey()).isEqualTo(key);
assertThat(ee.getOldValue()).isNull();
assertThat(ee.getNewValue()).isEqualTo("create" + i);
assertThat(Operation.LOCAL_LOAD_CREATE).isEqualTo(ee.getOperation());
assertThat("loadCB" + i).isEqualTo(ee.getCallbackArgument());
assertThat(ee.isOriginRemote()).isFalse();
}
});
Stream.of(vm1, vm2).forEach(vm -> vm.invoke("Close Pool", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}));
}
/**
* Tests interest key registration.
*/
@Test
public void test017ExpireDestroyHasEntryInCallback() throws CacheException {
disconnectAllFromDS();
final String name = this.getName();
// Create cache server
vm0.invoke("Create Cache Server", () -> {
// In lieu of System.setProperty("gemfire.EXPIRE_SENDS_ENTRY_AS_CALLBACK", "true");
EntryExpiryTask.expireSendsEntryAsCallback = true;
RegionFactory<Object, Object> factory = getBridgeServerRegionAttributes(null, null);
factory.setEntryTimeToLive(new ExpirationAttributes(1, ExpirationAction.DESTROY));
createRegion(name, factory);
startBridgeServer(0);
});
// Create cache server clients
final int port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
vm1.invoke("Create region", () -> {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setDataPolicy(DataPolicy.EMPTY);
factory.setSubscriptionAttributes(new SubscriptionAttributes((InterestPolicy.ALL)));
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
factory.addCacheListener(new CertifiableTestCacheListener<>());
Region<Object, Object> region = createRegion(name, factory);
region.registerInterest("ALL_KEYS");
});
vm1.invoke("Turn on history", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener<Object, Object> ctl =
(CertifiableTestCacheListener<Object, Object>) region.getAttributes()
.getCacheListeners()[0];
ctl.enableEventHistory();
});
// Create some entries on the client
vm1.invoke("Create entries", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 5; i++) {
region.put("key-client-" + i, "value-client-" + i);
}
});
// Create some entries on the server
vm0.invoke("Create entries", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 5; i++) {
region.put("key-server-" + i, "value-server-" + i);
}
});
vm1.invoke("Validate listener events", () -> {
AtomicInteger destroyCallbacks = new AtomicInteger(10);
Region<Object, Object> region = getRootRegion().getSubregion(name);
final CertifiableTestCacheListener<Object, Object> ctl =
(CertifiableTestCacheListener<Object, Object>) region.getAttributes()
.getCacheListeners()[0];
await().until(() -> {
List<CacheEvent<Object, Object>> list = ctl.getEventHistory();
for (CacheEvent ce : list) {
if (ce.getOperation() == Operation.DESTROY
&& ce.getCallbackArgument() instanceof String) {
destroyCallbacks.decrementAndGet();
}
}
return destroyCallbacks.get() == 0;
});
});
// Close cache server clients
vm1.invoke("Close Pool", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
});
// Stop cache server
}
private <K, V> RegionFactory<K, V> getBridgeServerRegionAttributes(CacheLoader<K, V> cl,
CacheWriter<K, V> cw) {
RegionFactory<K, V> regionFactory = getCache().createRegionFactory();
if (cl != null) {
regionFactory.setCacheLoader(cl);
}
if (cw != null) {
regionFactory.setCacheWriter(cw);
}
regionFactory.setScope(Scope.DISTRIBUTED_ACK);
regionFactory.setConcurrencyChecksEnabled(false);
return regionFactory;
}
private <K, V> RegionFactory<K, V> getBridgeServerMirroredAckRegionAttributes(
CacheLoader<K, V> cl) {
RegionFactory<K, V> regionFactory = getCache().createRegionFactory();
if (cl != null) {
regionFactory.setCacheLoader(cl);
}
regionFactory.setScope(Scope.DISTRIBUTED_ACK);
regionFactory.setConcurrencyChecksEnabled(false);
regionFactory.setDataPolicy(DataPolicy.REPLICATE);
return regionFactory;
}
/**
* Tests that updates are not sent to VMs that did not ask for them.
*/
@Test
public void test018OnlyRequestedUpdates() {
final String name1 = this.getName() + "-1";
final String name2 = this.getName() + "-2";
// Cache server serves up both regions
vm0.invoke("Create Cache Server", () -> {
RegionFactory<Object, Object> factory = getBridgeServerRegionAttributes(null, null);
createRegion(name1, factory);
createRegion(name2, factory);
startBridgeServer(0);
});
final int port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
// vm1 sends updates to the server
vm1.invoke("Create regions", () -> {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
Region rgn = createRegion(name1, factory);
rgn.registerInterestRegex(".*", false, false);
rgn = createRegion(name2, factory);
rgn.registerInterestRegex(".*", false, false);
});
// vm2 only wants updates to updates to region1
vm2.invoke("Create region", () -> {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
Region rgn = createRegion(name1, factory);
rgn.registerInterestRegex(".*", false, false);
createRegion(name2, factory);
// no interest registration for region 2
});
Stream.of(vm1, vm2).forEach(vm -> vm.invoke("Populate region", () -> {
Region<Object, Object> region1 = getRootRegion().getSubregion(name1);
for (int i = 0; i < 10; i++) {
region1.put(i, "Region1Old" + i);
}
Region<Object, Object> region2 = getRootRegion().getSubregion(name2);
for (int i = 0; i < 10; i++) {
region2.put(i, "Region2Old" + i);
}
}));
vm1.invoke("Update", () -> {
Region<Object, Object> region1 = getRootRegion().getSubregion(name1);
for (int i = 0; i < 10; i++) {
region1.put(i, "Region1New" + i);
}
Region<Object, Object> region2 = getRootRegion().getSubregion(name2);
for (int i = 0; i < 10; i++) {
region2.put(i, "Region2New" + i);
}
});
// Wait for updates to be propagated
vm2.invoke("Validate", () -> {
Region<Object, Object> region1 = getRootRegion().getSubregion(name1);
for (int i = 0; i < 10; i++) {
final int testInt = i;
await().until(() -> (region1.get(testInt).equals("Region1New" + testInt)));
}
Region<Object, Object> region2 = getRootRegion().getSubregion(name2);
for (int i = 0; i < 10; i++) {
final int testInt = i;
await().until(() -> region2.get(testInt).equals(("Region2Old" + testInt)));
}
});
vm1.invoke("Close Pool", () -> {
// Terminate region1's Pool
Region<Object, Object> region1 = getRootRegion().getSubregion(name1);
region1.localDestroyRegion();
// Terminate region2's Pool
Region<Object, Object> region2 = getRootRegion().getSubregion(name2);
region2.localDestroyRegion();
});
vm2.invoke("Close Pool", () -> {
// Terminate region1's Pool
Region<Object, Object> region1 = getRootRegion().getSubregion(name1);
region1.localDestroyRegion();
});
}
/**
* Tests interest key registration.
*/
@Test
public void test019InterestKeyRegistration() throws CacheException {
final String name = this.getName();
// Create cache server
vm0.invoke("Create Cache Server", () -> {
CacheLoader<Object, Object> cl = new CacheLoader<Object, Object>() {
@Override
public Object load(LoaderHelper helper) {
return helper.getKey();
}
@Override
public void close() {
}
};
RegionFactory<Object, Object> factory = getBridgeServerRegionAttributes(cl, null);
createRegion(name, factory);
startBridgeServer(0);
});
// Create cache server clients
final int port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
Stream.of(vm1, vm2).forEach(vm -> vm.invoke("Create region", () -> {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
createRegion(name, factory);
}));
// Get values for key 1 and key 2 so that there are entries in the clients.
// Register interest in one of the keys.
vm1.invoke("Create Entries and Register Interest", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
assertThat(region.get("key-1")).isEqualTo("key-1");
assertThat(region.get("key-2")).isEqualTo("key-2");
region.registerInterest("key-1");
});
vm2.invoke("Create Entries and Register Interest", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
assertThat(region.get("key-1")).isEqualTo("key-1");
assertThat(region.get("key-2")).isEqualTo("key-2");
region.registerInterest("key-2");
});
// Put new values and validate updates (VM1)
vm1.invoke("Put New Values", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.put("key-1", "vm1-key-1");
region.put("key-2", "vm1-key-2");
// Verify that no invalidates occurred to this region
assertThat(region.getEntry("key-1").getValue()).isEqualTo("vm1-key-1");
assertThat(region.getEntry("key-2").getValue()).isEqualTo("vm1-key-2");
});
vm2.invoke("Validate Entries", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
// Verify that 'key-2' was updated, but 'key-1' was not
// and contains the original value
await().until(() -> region.getEntry("key-1").getValue().equals("key-1"));
await().until(() -> region.getEntry("key-2").getValue().equals("vm1-key-2"));
});
// Put new values and validate updates (VM2)
vm2.invoke("Put New Values", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.put("key-1", "vm2-key-1");
region.put("key-2", "vm2-key-2");
// Verify that no updates occurred to this region
assertThat(region.getEntry("key-1").getValue()).isEqualTo("vm2-key-1");
assertThat(region.getEntry("key-2").getValue()).isEqualTo("vm2-key-2");
});
vm1.invoke("Validate Entries", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
// Verify that 'key-1' was updated, but 'key-2' was not
// and contains the original value
await().until(() -> region.getEntry("key-2").getValue().equals("vm1-key-2"));
await().until(() -> region.getEntry("key-1").getValue().equals("vm2-key-1"));
});
// Unregister interest
vm1.invoke("Unregister Interest", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.unregisterInterest("key-1");
});
vm2.invoke("Unregister Interest", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.unregisterInterest("key-2");
});
// Put new values and validate updates (VM1)
vm1.invoke("Put New Values", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.put("key-1", "vm1-key-1-again");
region.put("key-2", "vm1-key-2-again");
// Verify that no updates occurred to this region
assertThat(region.getEntry("key-1").getValue()).isEqualTo("vm1-key-1-again");
assertThat(region.getEntry("key-2").getValue()).isEqualTo("vm1-key-2-again");
});
vm2.invoke("Validate Entries", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
// Verify that neither 'key-1' 'key-2' was updated
// and contain the original value
await().until(() -> region.getEntry("key-1").getValue().equals("vm2-key-1"));
await().until(() -> region.getEntry("key-2").getValue().equals("vm2-key-2"));
});
// Put new values and validate updates (VM2)
vm2.invoke("Put New Values", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.put("key-1", "vm2-key-1-again");
region.put("key-2", "vm2-key-2-again");
// Verify that no updates occurred to this region
assertThat(region.getEntry("key-1").getValue()).isEqualTo("vm2-key-1-again");
assertThat(region.getEntry("key-2").getValue()).isEqualTo("vm2-key-2-again");
});
vm1.invoke("Validate Entries", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
// Verify that neither 'key-1' 'key-2' was updated
// and contain the original value
await().until(
() -> "vm1-key-1-again".compareTo((String) region.getEntry("key-1").getValue()) == 0);
await().until(() -> region.getEntry("key-2").getValue() == "vm1-key-2-again");
});
// Unregister interest again (to verify that a client can unregister interest
// in a key that its not interested in with no problem.
vm1.invoke("Unregister Interest", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.unregisterInterest("key-1");
});
vm2.invoke("Unregister Interest", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.unregisterInterest("key-2");
});
// Close cache server clients
Stream.of(vm1, vm2).forEach(vm -> vm.invoke("Close Pool", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}));
// Stop cache server
}
/**
* Tests interest list registration.
*/
@Test
public void test020InterestListRegistration() throws CacheException {
final String name = this.getName();
// Create cache server
vm0.invoke("Create Cache Server", () -> {
RegionFactory<Object, Object> regionFactory = getCache().createRegionFactory();
regionFactory.setScope(Scope.DISTRIBUTED_ACK);
regionFactory.setConcurrencyChecksEnabled(false);
Region<Object, Object> region = createRegion(name, regionFactory);
startBridgeServer(0);
for (int i = 0; i <= 6; i++) {
region.put("key-" + i, "key-" + i);
}
});
// Create cache server clients
final int port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
Stream.of(vm1, vm2).forEach(vm -> vm.invoke("Create region", () -> {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
createRegion(name, factory);
}));
// Get values for key 1 and key 6 so that there are entries in the clients.
// Register interest in a list of keys.
vm1.invoke("Create Entries and Register Interest", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
assertThat(region.get("key-1")).isEqualTo("key-1");
assertThat(region.get("key-6")).isEqualTo("key-6");
List<Object> list = new ArrayList<>();
list.add("key-1");
list.add("key-2");
list.add("key-3");
list.add("key-4");
list.add("key-5");
region.registerInterest(list);
});
vm2.invoke("Create Entries and Register Interest", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
assertThat(region.get("key-1")).isEqualTo("key-1");
assertThat(region.get("key-6")).isEqualTo("key-6");
});
// Put new values and validate updates (VM2)
vm2.invoke("Put New Values", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.put("key-1", "vm2-key-1");
region.put("key-6", "vm2-key-6");
// Verify that no updates occurred to this region
assertThat(region.getEntry("key-1").getValue()).isEqualTo("vm2-key-1");
assertThat(region.getEntry("key-6").getValue()).isEqualTo("vm2-key-6");
});
vm1.invoke("Validate Entries", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
// Verify that 'key-1' was updated
await().until(() -> region.getEntry("key-1").getValue().equals("vm2-key-1"));
// Verify that 'key-6' was not invalidated
await().until(() -> region.getEntry("key-6").getValue().equals("key-6"));
});
// Close cache server clients
vm1.invoke("Close Pool", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
});
vm2.invoke("Close Pool", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
});
// Stop cache server
}
static class ConnectionPoolDUnitTestSerializable2 implements java.io.Serializable {
protected ConnectionPoolDUnitTestSerializable2(String key) {
_key = key;
}
public String getKey() {
return _key;
}
final String _key;
}
/**
* Accessed by reflection DO NOT REMOVE
*/
private static int getCacheServerPort() {
return bridgeServerPort;
}
private static long getNumberOfAfterCreates() {
return numberOfAfterCreates;
}
private static long getNumberOfAfterUpdates() {
return numberOfAfterUpdates;
}
private static long getNumberOfAfterInvalidates() {
return numberOfAfterInvalidates;
}
/**
* Creates a "loner" distributed system that has dynamic region creation enabled.
*
* @since GemFire 4.3
*/
private void createDynamicRegionCache(String connectionPoolName) {
// note that clients use non-persistent dr factories.
DynamicRegionFactory.get()
.open(new DynamicRegionFactory.Config(null, connectionPoolName, false, true));
logger.info("CREATED IT");
getCache();
}
/**
* A handy method to poll for arrival of non-null/non-invalid entries
*
* @param r the Region to poll
* @param key the key of the Entry to poll for
*/
private static void waitForEntry(final Region r, final Object key) {
await().alias("Waiting for entry " + key + " on region " + r)
.until(() -> r.containsValueForKey(key));
}
private static <K, V> Region<K, V> waitForSubRegion(final Region<K, V> r,
final String subRegName) {
await().alias("Waiting for subregion " + subRegName)
.until(() -> r.getSubregion(subRegName) != null);
return r.getSubregion(subRegName);
}
@Test
public void test021ClientGetOfInvalidServerEntry() throws CacheException {
final String regionName1 = this.getName() + "-1";
VM server1 = VM.getVM(0);
VM client = VM.getVM(2);
// Create server1.
server1.invoke("Create Cache Server and values", () -> {
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.REPLICATE);
factory.setConcurrencyChecksEnabled(false);
createRegion(regionName1, factory);
startBridgeServer(0);
Region<Object, Object> region1 = getRootRegion().getSubregion(regionName1);
// create it invalid
region1.create("key-string-1", null);
});
final int port = server1.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
// now try it with a local scope
client.invoke("Create region 2 and get values on client", () -> {
getLonerSystem();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, false, -1, -1,
null);
createRegion(regionName1, factory);
Region<Object, Object> region1 = getRootRegion().getSubregion(regionName1);
assertThat(region1.getEntry("key-string-1")).isNull();
assertThat(region1.get("key-string-1")).isNull();
});
}
@Test
public void test022ClientRegisterUnregisterRequests() throws CacheException {
final String regionName1 = this.getName() + "-1";
VM server1 = vm0;
VM client = vm2;
// Create server1.
server1.invoke("Create Cache Server", () -> {
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.REPLICATE);
factory.setConcurrencyChecksEnabled(false);
createRegion(regionName1, factory);
startBridgeServer(0);
});
final int port = server1.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
// Create client.
client.invoke("Create region", () -> {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1,
null);
Region<Object, Object> region11 = createRegion(regionName1, factory);
region11.getAttributesMutator().addCacheListener(new CertifiableTestCacheListener<>());
});
// Init values at server.
server1.invoke("Create values", () -> {
Region<Object, Object> region1 = getRootRegion().getSubregion(regionName1);
for (int i = 0; i < 20; i++) {
region1.put("key-string-" + i, "value-" + i);
}
});
// Put some values on the client.
client.invoke("Put values client and close pool", () -> {
Region<Object, Object> region1 = getRootRegion().getSubregion(regionName1);
for (int i = 0; i < 10; i++) {
region1.put("key-string-" + i, "client-value-" + i);
}
String pName = region1.getAttributes().getPoolName();
region1.localDestroyRegion();
PoolImpl p = (PoolImpl) PoolManager.find(pName);
p.destroy();
});
server1.invoke("validate Client Register UnRegister", () -> {
for (CacheServer cacheServer : getCache().getCacheServers()) {
InternalCacheServer bsi = (InternalCacheServer) cacheServer;
final CacheClientNotifierStats ccnStats =
bsi.getAcceptor().getCacheClientNotifier().getStats();
await().until(() -> ccnStats.getClientRegisterRequests() == ccnStats
.getClientUnRegisterRequests());
assertThat(ccnStats.getClientRegisterRequests())
.describedAs("HealthMonitor Client Register/UnRegister mismatch.")
.isEqualTo(ccnStats.getClientUnRegisterRequests());
}
});
}
/**
* Tests the containsKeyOnServer operation of the {@link Pool}
*
* @since GemFire 5.0.2
*/
@Test
public void test023ContainsKeyOnServer() throws CacheException {
final String name = this.getName();
vm0.invoke("Create Cache Server", () -> {
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setConcurrencyChecksEnabled(false);
createRegion(name, factory);
startBridgeServer(0);
});
final int port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
Stream.of(vm1, vm2).forEach(vm -> vm.invoke("Create region", () -> {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, false, -1, -1, null);
createRegion(name, factory);
}));
final Integer key1 = 0;
final String key2 = "0";
vm2.invoke("Contains key on server", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
boolean containsKey;
containsKey = region.containsKeyOnServer(key1);
assertThat(containsKey).isFalse();
containsKey = region.containsKeyOnServer(key2);
assertThat(containsKey).isFalse();
});
vm1.invoke("Put values", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.put(0, 0);
region.put("0", "0");
});
vm2.invoke("Contains key on server", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
boolean containsKey = region.containsKeyOnServer(key1);
assertThat(containsKey).isTrue();
containsKey = region.containsKeyOnServer(key2);
assertThat(containsKey).isTrue();
});
for (VM vm : Arrays.asList(vm1, vm2)) {
vm.invoke("Close Pool", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
});
}
}
/**
* Tests that invoking {@link Region#create} with a <code>null</code> value does the right thing
* with the {@link Pool}.
*
* @since GemFire 3.5
*/
@Test
public void test024CreateNullValue() throws CacheException {
final String name = this.getName();
final Object createCallbackArg = "CREATE CALLBACK ARG";
vm0.invoke("Create Cache Server", () -> {
RegionFactory<Object, Object> factory = getBridgeServerRegionAttributes(null, null);
createRegion(name, factory);
startBridgeServer(0);
});
final int port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
Stream.of(vm1, vm2).forEach(vm -> vm.invoke("Create region", () -> {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
createRegion(name, factory);
}));
vm2.invoke("Create nulls", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.create(i, null, createCallbackArg);
}
});
vm2.invoke("Verify invalidates", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Region.Entry entry = region.getEntry(i);
await().until(() -> entry != null);
await().until(() -> entry.getValue() == null);
}
});
vm1.invoke("Attempt to create values", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.create(i, "new" + i);
}
});
vm2.invoke("Verify invalidates", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Region.Entry entry = region.getEntry(i);
await().until(() -> entry != null);
await().until(() -> entry.getValue() == null);
}
});
Stream.of(vm1, vm2).forEach(vm -> vm.invoke("Close Pool", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}));
}
/**
* Tests that a {@link Region#localDestroy} is not propagated to the server and that a {@link
* Region#destroy} is. Also makes sure that callback arguments are passed correctly.
*/
@Test
public void test025Destroy() throws CacheException {
final String name = this.getName();
final Object callbackArg = "DESTROY CALLBACK";
vm0.invoke("Create Cache Server", () -> {
CacheWriter<Object, Object> cw = new TestCacheWriter<Object, Object>() {
@Override
public void beforeCreate2(EntryEvent event) throws CacheWriterException {
}
@Override
public void beforeDestroy2(EntryEvent event) throws CacheWriterException {
Object beca = event.getCallbackArgument();
assertThat(callbackArg).isEqualTo(beca);
}
};
RegionFactory<Object, Object> factory = getBridgeServerRegionAttributes(null, cw);
createRegion(name, factory);
startBridgeServer(0);
});
final int port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
vm1.invoke("Create and Load region", () -> {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
Region rgn = createRegion(name, factory);
rgn.registerInterestRegex(".*", false, false);
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.put(i, String.valueOf(i));
}
});
vm2.invoke("Create and Load region", () -> {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
Region rgn = createRegion(name, factory);
rgn.registerInterestRegex(".*", false, false);
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
assertThat(String.valueOf(i)).isEqualTo(region.get(i));
}
});
vm1.invoke("Local destroy", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.localDestroy(i);
}
});
vm2.invoke("No destroy propagate", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
assertThat(String.valueOf(i)).isEqualTo(region.get(i));
}
});
vm1.invoke("Fetch from server", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
assertThat(String.valueOf(i)).isEqualTo(region.get(i));
}
});
vm0.invoke("Check no server cache writer", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
TestCacheWriter writer = getTestWriter(region);
writer.wasInvoked();
});
vm1.invoke(() -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.destroy(i, callbackArg);
}
for (int i = 0; i < 10; i++) {
final int testInt = i;
await().until(() -> region.getEntry(testInt) == null);
}
});
vm2.invoke("Validate destroy propagate", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
final int testInt = i;
await().until(() -> region.getEntry(testInt) == null);
}
});
Stream.of(vm1, vm2).forEach(vm -> vm.invoke("Close Pool", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}));
}
/**
* Tests that a {@link Region#localDestroyRegion} is not propagated to the server and that a
* {@link Region#destroyRegion} is. Also makes sure that callback arguments are passed correctly.
*/
@Ignore("TODO")
@Test
public void testDestroyRegion() throws CacheException {
final String name = this.getName();
final Object callbackArg = "DESTROY CALLBACK";
vm0.invoke("Create Cache Server", () -> {
CacheWriter<Object, Object> cw = new TestCacheWriter<Object, Object>() {
@Override
public void beforeCreate2(EntryEvent event) throws CacheWriterException {
}
@Override
public void beforeRegionDestroy2(RegionEvent event) throws CacheWriterException {
assertThat(callbackArg).isEqualTo(event.getCallbackArgument());
}
};
RegionFactory<Object, Object> factory = getBridgeServerRegionAttributes(null, cw);
createRegion(name, factory);
startBridgeServer(0);
});
final int port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
SerializableRunnable create = new CacheSerializableRunnable() {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
createRegion(name, factory);
}
};
Stream.of(vm1, vm2).forEach(vm -> vm.invoke("Create region", create));
vm1.invoke("Local destroy region", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
assertThat(getRootRegion().getSubregion(name)).isNull();
});
vm2.invoke("No destroy propagate", () -> {
assertThat(getRootRegion().getSubregion(name)).isNotNull();
});
vm0.invoke("Check no server cache writer", () -> {
TestCacheWriter writer = getTestWriter(getRootRegion().getSubregion(name));
writer.wasInvoked();
});
vm1.invoke(create);
vm1.invoke("Distributed destroy region", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
assertThat(region).isNotNull();
region.destroyRegion(callbackArg);
assertThat(getRootRegion().getSubregion(name)).isNull();
});
vm2.invoke("Verify destroy propagate",
() -> await().until(() -> getRootRegion().getSubregion(name) == null));
}
/**
* Tests interest list registration with callback arg with DataPolicy.EMPTY and
* InterestPolicy.ALL
*/
@Test
public void test026DPEmptyInterestListRegistrationWithCallbackArg() throws CacheException {
final String name = this.getName();
// Create cache server
vm0.invoke("Create Cache Server", () -> {
CacheLoader<Object, Object> cl = new CacheLoader<Object, Object>() {
@Override
public Object load(LoaderHelper helper) {
return helper.getKey();
}
@Override
public void close() {}
};
RegionFactory<Object, Object> factory = getBridgeServerRegionAttributes(cl, null);
createRegion(name, factory);
startBridgeServer(0);
});
// Create cache server clients
final int port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
vm1.invoke("Create region", () -> {
getLonerSystem();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
factory.addCacheListener(new ControlListener());
factory.setDataPolicy(DataPolicy.EMPTY);
factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
createRegion(name, factory);
});
vm2.invoke("Create publisher region", () -> {
getLonerSystem();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1,
null);
factory.addCacheListener(new ControlListener());
factory.setDataPolicy(DataPolicy.EMPTY); // make sure empty works with client publishers
createRegion(name, factory);
});
// VM1 Register interest
vm1.invoke("Create Entries and Register Interest", () -> getRootRegion().getSubregion(name)
.registerInterest("key-1", InterestResultPolicy.NONE));
// VM2 Put entry (this will cause a create event in both VM1 and VM2)
vm2.invoke(() -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.create("key-1", "key-1-create", "key-1-create");
region.put("key-1", "key-1-update", "key-1-update");
region.destroy("key-1", "key-1-destroy");
});
vm1.invoke("Verify events", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
ControlListener listener = (ControlListener) region.getAttributes().getCacheListeners()[0];
int eventCount = 3;
listener.waitWhileNotEnoughEvents(eventCount);
assertThat(eventCount).isEqualTo(listener.events.size());
{
EventWrapper ew = listener.events.get(0);
assertThat(TYPE_CREATE).isEqualTo(ew.type);
Object key = "key-1";
assertThat(key).isEqualTo(ew.event.getKey());
assertThat(ew.event.getOldValue()).isNull();
assertThat(ew.event.isOldValueAvailable()).isFalse(); // failure
assertThat("key-1-create").isEqualTo(ew.event.getNewValue());
assertThat(Operation.CREATE).isEqualTo(ew.event.getOperation());
assertThat("key-1-create").isEqualTo(ew.event.getCallbackArgument());
assertThat(ew.event.isOriginRemote()).isTrue();
ew = listener.events.get(1);
assertThat(TYPE_UPDATE).isEqualTo(ew.type);
assertThat(key).isEqualTo(ew.event.getKey());
assertThat(ew.event.getOldValue()).isNull();
assertThat(ew.event.isOldValueAvailable()).isFalse();
assertThat("key-1-update").isEqualTo(ew.event.getNewValue());
assertThat(Operation.UPDATE).isEqualTo(ew.event.getOperation());
assertThat("key-1-update").isEqualTo(ew.event.getCallbackArgument());
assertThat(ew.event.isOriginRemote()).isTrue();
ew = listener.events.get(2);
assertThat(TYPE_DESTROY).isEqualTo(ew.type);
assertThat("key-1-destroy").isEqualTo(ew.arg);
assertThat(key).isEqualTo(ew.event.getKey());
assertThat(ew.event.getOldValue()).isNull();
assertThat(ew.event.isOldValueAvailable()).isFalse();
assertThat(ew.event.getNewValue()).isNull();
assertThat(Operation.DESTROY).isEqualTo(ew.event.getOperation());
assertThat("key-1-destroy").isEqualTo(ew.event.getCallbackArgument());
assertThat(ew.event.isOriginRemote()).isTrue();
}
});
Stream.of(vm1, vm2).forEach(vm -> {
// Close cache server clients
vm.invoke("Close Pool", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
});
});
}
/**
* Tests interest list registration with callback arg with DataPolicy.EMPTY and
* InterestPolicy.CACHE_CONTENT
*/
@Test
public void test027DPEmptyCCInterestListRegistrationWithCallbackArg() throws CacheException {
final String name = this.getName();
// Create cache server
vm0.invoke("Create Cache Server", () -> {
CacheLoader<Object, Object> cl = new CacheLoader<Object, Object>() {
@Override
public Object load(LoaderHelper helper) {
return helper.getKey();
}
@Override
public void close() {
}
};
RegionFactory<Object, Object> factory = getBridgeServerRegionAttributes(cl, null);
createRegion(name, factory);
startBridgeServer(0);
});
// Create cache server clients
final int port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
vm1.invoke("Create region", () -> {
getLonerSystem();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
factory.addCacheListener(new ControlListener());
factory.setDataPolicy(DataPolicy.EMPTY);
factory
.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.CACHE_CONTENT));
createRegion(name, factory);
});
vm2.invoke("Create publisher region", () -> {
getLonerSystem();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1,
null);
factory.addCacheListener(new ControlListener());
factory.setDataPolicy(DataPolicy.EMPTY); // make sure empty works with client publishers
createRegion(name, factory);
});
// VM1 Register interest
vm1.invoke("Create Entries and Register Interest", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
// This call will cause no value to be put into the region
region.registerInterest("key-1", InterestResultPolicy.NONE);
});
// VM2 Put entry (this will cause a create event in both VM1 and VM2)
vm2.invoke("Put Value", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.create("key-1", "key-1-create", "key-1-create");
});
// VM2 Put entry (this will cause an update event in both VM1 and VM2)
vm2.invoke("Put Value", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.put("key-1", "key-1-update", "key-1-update");
});
// VM2 Destroy entry (this will cause a destroy event)
vm2.invoke("Destroy Entry", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.destroy("key-1", "key-1-destroy");
});
vm1.invoke("Verify events", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
ControlListener listener = (ControlListener) region.getAttributes().getCacheListeners()[0];
await().until(() -> listener.events.size() == 0);
});
// Close cache server clients
Stream.of(vm1, vm2).forEach(vm -> vm.invoke("Close Pool", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}));
}
/**
* Test dynamic region creation instantiated from a bridge client causing regions to be created on
* two different cache servers.
* <p>
* Also tests the reverse situation, a dynamic region is created on the cache server expecting the
* same region to be created on the client.
* <p>
* Note: This test re-creates Distributed Systems for its own purposes and uses a Loner
* distributed systems to isolate the Bridge Client.
*/
@Test
public void test028DynamicRegionCreation() {
final String name = this.getName();
final VM client1 = vm0;
final VM srv1 = vm2;
final VM srv2 = vm3;
final String key1 = name + "-key1";
final String value1 = name + "-val1";
final String key2 = name + "-key2";
final String value2 = name + "-val2";
final String key3 = name + "-key3";
final String value3 = name + "-val3";
// setup servers
Stream.of(srv1, srv2).forEach(vm -> vm.invoke("Create Cache Server", () -> {
createDynamicRegionCache(null); // Creates a new DS and Cache
assertThat(DynamicRegionFactory.get().isOpen()).isTrue();
startBridgeServer(0);
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.REPLICATE);
factory.setConcurrencyChecksEnabled(false);
Region<Object, Object> region = createRootRegion(name, factory);
region.put(key1, value1);
assertThat(region.get(key1)).isEqualTo(value1);
}));
final String srv1Host = NetworkUtils.getServerHostName();
final int srv1Port = srv1.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final int srv2Port = srv2.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
// setup clients, do basic tests to make sure pool with notifier work as advertised
client1.invoke("Create Cache Client", () -> {
createLonerDS();
AttributesFactory<Object, Object> factory = new AttributesFactory<>();
factory.setConcurrencyChecksEnabled(false);
Pool cp = configureConnectionPool(factory, srv1Host, new int[] {srv1Port,
srv2Port}, true, -1, -1, null);
assertThat(cp).isNotNull();
{
final PoolImpl pool = (PoolImpl) cp;
await().untilAsserted(() -> {
assertThat(pool.getPrimary()).isNotNull();
assertThat(pool.getRedundants()).isNotEmpty();
});
assertThat(pool.getRedundants())
.describedAs("backups=" + pool.getRedundants() + " expected=" + 1).isNotEmpty();
}
createDynamicRegionCache("testPool");
assertThat(DynamicRegionFactory.get().isOpen()).isTrue();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
factory.addCacheListener(new CertifiableTestCacheListener<>());
Region<Object, Object> region = createRootRegion(name, factory.create());
assertThat(region.getEntry(key1)).isNull();
region.registerInterestRegex(".*", InterestResultPolicy.KEYS_VALUES); // this should match
// the key
assertThat(value1).isEqualTo(region.getEntry(key1).getValue()); // Update via registered
// interest
assertThat(region.getEntry(key2)).isNull();
region.put(key2, value2); // use the Pool
assertThat(value2).isEqualTo(region.getEntry(key2).getValue()); // Ensure that the notifier
// didn't un-do the put, bug 35355
region.put(key3, value3); // setup a key for invalidation from a notifier;
});
srv1.invoke("Validate Server1 update", () -> {
CacheClientNotifier ccn = getInstance();
final CacheClientNotifierStats ccnStats = ccn.getStats();
final int eventCount = ccnStats.getEvents();
Region<Object, Object> region = getRootRegion(name);
assertThat(region).isNotNull();
assertThat(value2)
.isEqualTo(region.getEntry(key2).getValue()); // Validate the Pool worked,
// getEntry works because of the mirror
assertThat(value3).isEqualTo(region.getEntry(key3).getValue()); // Make sure we have the
// other entry to use for notification
region.put(key3, value1); // Change k3, sending some data to the client notifier
// Wait for the update to propagate to the clients
await("waiting for ccnStat").until(() -> ccnStats.getEvents() > eventCount);
});
srv2.invoke("Validate Server2 update", () -> {
Region<Object, Object> rootRegion = getRootRegion(name);
assertThat(rootRegion).isNotNull();
assertThat(value2).isEqualTo(rootRegion.getEntry(key2).getValue()); // Validate the Pool
// worked, getEntry works because of the mirror
assertThat(value1).isEqualTo(rootRegion.getEntry(key3).getValue()); // From peer update;
});
client1.invoke("Validate Client notification", () -> {
Region<Object, Object> rootRegion = getRootRegion(name);
assertThat(rootRegion).isNotNull();
CertifiableTestCacheListener<Object, Object> ctl =
(CertifiableTestCacheListener<Object, Object>) rootRegion.getAttributes()
.getCacheListeners()[0];
ctl.waitForUpdated(key3);
assertThat(value1).isEqualTo(rootRegion.getEntry(key3).getValue()); // Ensure that the
// notifier updated the entry
});
// Ok, now we are ready to do some dynamic region action!
final String v1Dynamic = value1 + "dynamic";
final String dynFromClientName = name + "-dynamic-client";
final String dynFromServerName = name + "-dynamic-server";
client1.invoke("Client dynamic region creation", () -> {
assertThat(DynamicRegionFactory.get().isOpen()).isTrue();
Region<Object, Object> region = getRootRegion(name);
assertThat(region).isNotNull();
Region<Object, Object> dynamicRegion =
DynamicRegionFactory.get().createDynamicRegion(name, dynFromClientName);
assertThat(dynamicRegion.get(key1)).isNull(); // This should be enough to validate the
// creation on the
// server
dynamicRegion.put(key1, v1Dynamic);
assertThat(v1Dynamic).isEqualTo(dynamicRegion.getEntry(key1).getValue());
});
// Assert the servers have the dynamic region and the new value
Stream.of(srv1, srv2)
.forEach(vm -> vm.invoke("Validate dynamic region creation on server", () -> {
Region<Object, Object> rootRegion = getRootRegion(name);
assertThat(rootRegion).isNotNull();
await().untilAsserted(() -> {
Region<Object, Object> region = rootRegion.getSubregion(dynFromClientName);
assertThat(region).isNotNull();
assertThat(getCache().getRegion(name + SEPARATOR + dynFromClientName))
.isNotNull();
});
Region<Object, Object> dynamicRegion = rootRegion.getSubregion(dynFromClientName);
assertThat(v1Dynamic).isEqualTo(dynamicRegion.getEntry(key1).getValue());
}));
// now delete the dynamic region and see if it goes away on servers
client1.invoke("Client dynamic region destruction", () -> {
assertThat(DynamicRegionFactory.get().isActive()).isTrue();
Region<Object, Object> r = getRootRegion(name);
assertThat(r).isNotNull();
String drName = r.getFullPath() + SEPARATOR + dynFromClientName;
assertThat(getCache().getRegion(drName)).isNotNull();
DynamicRegionFactory.get().destroyDynamicRegion(drName);
assertThat(getCache().getRegion(drName)).isNull();
});
// Assert the servers no longer have the dynamic region
Stream.of(srv1, srv2)
.forEach(vm -> vm.invoke("Validate dynamic region destruction on server", () -> {
Region<Object, Object> r = getRootRegion(name);
assertThat(r).isNotNull();
String drName = r.getFullPath() + SEPARATOR + dynFromClientName;
assertThat(getCache().getRegion(drName)).isNull();
try {
DynamicRegionFactory.get().destroyDynamicRegion(drName);
fail("expected RegionDestroyedException");
} catch (RegionDestroyedException ignored) {
}
}));
// Now try the reverse, create a dynamic region on the server and see if the client
// has it
srv2.invoke("Server dynamic region creation", () -> {
Region<Object, Object> r = getRootRegion(name);
assertThat(r).isNotNull();
Region<Object, Object> dr =
DynamicRegionFactory.get().createDynamicRegion(name, dynFromServerName);
assertThat(dr.get(key1)).isNull();
dr.put(key1, v1Dynamic);
assertThat(v1Dynamic).isEqualTo(dr.getEntry(key1).getValue());
});
// Assert the servers have the dynamic region and the new value
srv1.invoke("Validate dynamic region creation propagation to other server", () -> {
Region<Object, Object> r = getRootRegion(name);
assertThat(r).isNotNull();
Region<Object, Object> dr = waitForSubRegion(r, dynFromServerName);
assertThat(dr).isNotNull();
assertThat(getCache().getRegion(name + SEPARATOR + dynFromServerName))
.isNotNull();
waitForEntry(dr, key1);
assertThat(dr.getEntry(key1)).isNotNull();
assertThat(v1Dynamic).isEqualTo(dr.getEntry(key1).getValue());
});
// Assert the clients have the dynamic region and the new value
client1.invoke("Validate dynamic region creation on client", () -> {
Region<Object, Object> r = getRootRegion(name);
assertThat(r).isNotNull();
Region<Object, Object> dr;
await().pollInterval(1, SECONDS).until(() -> r.getSubregion(dynFromServerName) != null
&& getCache().getRegion(name + SEPARATOR + dynFromServerName) != null);
dr = r.getSubregion(dynFromServerName);
waitForEntry(dr, key1);
assertThat(dr.getEntry(key1)).isNotNull();
assertThat(v1Dynamic).isEqualTo(dr.getEntry(key1).getValue());
});
// now delete the dynamic region on a server and see if it goes away on client
srv2.invoke("Server dynamic region destruction", () -> {
assertThat(DynamicRegionFactory.get().isActive()).isTrue();
Region<Object, Object> r = getRootRegion(name);
assertThat(r).isNotNull();
String drName = r.getFullPath() + SEPARATOR + dynFromServerName;
assertThat(getCache().getRegion(drName)).isNotNull();
DynamicRegionFactory.get().destroyDynamicRegion(drName);
assertThat(getCache().getRegion(drName)).isNull();
});
srv1.invoke("Validate dynamic region destruction on other server", () -> {
Region<Object, Object> r = getRootRegion(name);
assertThat(r).isNotNull();
String drName = r.getFullPath() + SEPARATOR + dynFromServerName;
await().ignoreException(RegionDestroyedException.class)
.until(() -> getCache().getRegion(drName) == null);
assertThat(getCache().getRegion(drName)).isNull();
});
// Assert the clients no longer have the dynamic region
client1.invoke("Validate dynamic region destruction on client", () -> {
Region<Object, Object> r = getRootRegion(name);
assertThat(r).isNotNull();
String drName = r.getFullPath() + SEPARATOR + dynFromServerName;
await().ignoreException(RegionDestroyedException.class)
.until(() -> getCache().getRegion(drName) == null);
Throwable thrown =
catchThrowable(() -> DynamicRegionFactory.get().destroyDynamicRegion(drName));
assertThat(thrown).isInstanceOf(RegionDestroyedException.class);
});
}
/**
* Test for bug 36279
*/
@Test
public void test029EmptyByteArray() throws CacheException {
final String name = this.getName();
final Object createCallbackArg = "CREATE CALLBACK ARG";
vm0.invoke("Create Cache Server", () -> {
RegionFactory<Object, Object> factory = getBridgeServerRegionAttributes(null, null);
createRegion(name, factory);
startBridgeServer(0);
});
final int port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
vm1.invoke("Create region", () -> {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
createRegion(name, factory);
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 1; i++) {
region.create(i, new byte[0], createCallbackArg);
}
for (int i = 0; i < 1; i++) {
Region.Entry entry = region.getEntry(i);
assertThat(entry).isNotNull();
byte[] value = (byte[]) entry.getValue();
assertThat(value).isNotNull();
assertThat(value).isEmpty();
}
});
vm0.invoke("Verify values on server", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 1; i++) {
Region.Entry entry = region.getEntry(i);
assertThat(entry).isNotNull();
byte[] value = (byte[]) entry.getValue();
assertThat(value).isNotNull();
assertThat(value).isEmpty();
}
});
vm1.invoke("Close Pool", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
});
}
/**
* Tests interest list registration with callback arg
*/
@Test
public void test030InterestListRegistrationWithCallbackArg() throws CacheException {
final String name = this.getName();
// Create cache server
vm0.invoke("Create Cache Server", () -> {
CacheLoader<Object, Object> cl = new CacheLoader<Object, Object>() {
@Override
public Object load(LoaderHelper helper) {
return helper.getKey();
}
@Override
public void close() {
}
};
RegionFactory<Object, Object> factory = getBridgeServerRegionAttributes(cl, null);
createRegion(name, factory);
startBridgeServer(0);
});
// Create cache server clients
final int port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
Stream.of(vm1, vm2).forEach(vm -> vm.invoke("Create region", () -> {
getLonerSystem();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
factory.addCacheListener(new ControlListener());
createRegion(name, factory);
}));
// VM1 Register interest
vm1.invoke("Create Entries and Register Interest", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
// This call will cause no value to be put into the region
region.registerInterest("key-1", InterestResultPolicy.NONE);
});
// VM2 Put entry (this will cause a create event in both VM1 and VM2)
vm2.invoke("Put Value", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.create("key-1", "key-1-create", "key-1-create");
});
// VM2 Put entry (this will cause an update event in both VM1 and VM2)
vm2.invoke("Put Value", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.put("key-1", "key-1-update", "key-1-update");
});
// VM2 Destroy entry (this will cause a destroy event)
vm2.invoke("Destroy Entry", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.destroy("key-1", "key-1-destroy");
});
vm1.invoke("Verify events", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
ControlListener listener = (ControlListener) region.getAttributes().getCacheListeners()[0];
int eventCount = 3;
listener.waitWhileNotEnoughEvents(eventCount);
assertThat(eventCount).isEqualTo(listener.events.size());
{
EventWrapper ew = listener.events.get(0);
assertThat(ew.type).isEqualTo(TYPE_CREATE);
Object key = "key-1";
assertThat(key).isEqualTo(ew.event.getKey());
assertThat(ew.event.getOldValue()).isNull();
assertThat("key-1-create").isEqualTo(ew.event.getNewValue());
assertThat(Operation.CREATE).isEqualTo(ew.event.getOperation());
assertThat("key-1-create").isEqualTo(ew.event.getCallbackArgument());
assertThat(ew.event.isOriginRemote()).isTrue();
ew = listener.events.get(1);
assertThat(ew.type).isEqualTo(TYPE_UPDATE);
assertThat(key).isEqualTo(ew.event.getKey());
assertThat("key-1-create").isEqualTo(ew.event.getOldValue());
assertThat("key-1-update").isEqualTo(ew.event.getNewValue());
assertThat(Operation.UPDATE).isEqualTo(ew.event.getOperation());
assertThat("key-1-update").isEqualTo(ew.event.getCallbackArgument());
assertThat(ew.event.isOriginRemote()).isTrue();
ew = listener.events.get(2);
assertThat(ew.type).isEqualTo(TYPE_DESTROY);
assertThat("key-1-destroy").isEqualTo(ew.arg);
assertThat(key).isEqualTo(ew.event.getKey());
assertThat("key-1-update").isEqualTo(ew.event.getOldValue());
assertThat(ew.event.getNewValue()).isNull();
assertThat(Operation.DESTROY).isEqualTo(ew.event.getOperation());
assertThat("key-1-destroy").isEqualTo(ew.event.getCallbackArgument());
assertThat(ew.event.isOriginRemote()).isTrue();
}
});
// Close cache server clients
Stream.of(vm1, vm2).forEach(vm -> vm.invoke("Close Pool", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}));
}
/**
* Tests the keySetOnServer operation of the {@link Pool}
*
* @since GemFire 5.0.2
*/
@Test
public void test031KeySetOnServer() throws CacheException {
final String name = this.getName();
vm0.invoke("Create Cache Server", () -> {
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setConcurrencyChecksEnabled(false);
createRegion(name, factory);
startBridgeServer(0);
});
final int port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
Stream.of(vm1, vm2).forEach(vm -> vm.invoke("Create region", () -> {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
createRegion(name, factory);
}));
vm2.invoke("Get keys on server", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
Set keySet = region.keySetOnServer();
assertThat(keySet).isNotNull();
assertThat(keySet).isEmpty();
});
vm1.invoke("Put values", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.put(i, i);
}
});
vm2.invoke("Get keys on server", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
Set keySet = region.keySetOnServer();
assertThat(keySet).isNotNull();
assertThat(10).isEqualTo(keySet.size());
});
Stream.of(vm1, vm2).forEach(vm -> vm.invoke("Close Pool", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}));
}
/**
* Tests that creating, putting and getting a non-serializable key or value throws the correct
* (NotSerializableException) exception.
*/
@Test
public void test033NotSerializableException() throws CacheException {
final String name = this.getName();
vm0.invoke("Create Cache Server", () -> {
RegionFactory<Object, Object> factory = getBridgeServerRegionAttributes(null, null);
createRegion(name, factory);
startBridgeServer(0);
});
final int port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final String host0 = NetworkUtils.getServerHostName();
vm1.invoke("Create region", () -> {
getLonerSystem();
getCache();
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
configureConnectionPool(factory, host0, new int[] {port}, true, -1, -1, null);
createRegion(name, factory);
Region<Object, Object> region = getRootRegion().getSubregion(name);
Throwable thrown =
catchThrowable(() -> region.create(1, new ConnectionPoolTestNonSerializable()));
assertThat(thrown).hasCauseInstanceOf(NotSerializableException.class);
Throwable thrown2 =
catchThrowable(() -> region.put(1, new ConnectionPoolTestNonSerializable()));
assertThat(thrown2).hasCauseInstanceOf(NotSerializableException.class);
Throwable thrown3 =
catchThrowable(() -> region.get(new ConnectionPoolTestNonSerializable()));
assertThat(thrown3).hasCauseInstanceOf(NotSerializableException.class);
region.localDestroyRegion();
});
}
static class ConnectionPoolTestNonSerializable {
ConnectionPoolTestNonSerializable() {}
}
/**
* Tests 'notify-all' client updates. This test verifies that: - only invalidates are sent as part
* of the 'notify-all' mode of client updates - originators of updates are not sent invalidates -
* non-originators of updates are sent invalidates - multiple invalidates are not sent for the
* same update
*/
@Test
public void test034NotifyAllUpdates() throws CacheException {
final String name = this.getName();
disconnectAllFromDS();
for (VM vm : new VM[] {vm0, vm1}) {
// Create the cache servers with distributed, mirrored region
vm.invoke("Create Cache Server", () -> {
CacheLoader<Object, Object> cl = new CacheLoader<Object, Object>() {
@Override
public Object load(LoaderHelper helper) {
return helper.getKey();
}
@Override
public void close() {
}
};
RegionFactory<Object, Object> factory =
getBridgeServerMirroredAckRegionAttributes(cl);
createRegion(name, factory);
startBridgeServer(0);
});
}
// Create cache server clients
final int numberOfKeys = 10;
final String host0 = NetworkUtils.getServerHostName();
final int vm0Port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
final int vm1Port = vm1.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
Stream.of(vm2, vm3).forEach(vm -> {
logger.info("before create client");
vm.invoke("Create Cache Server Client", () -> {
// reset all static listener variables in case this is being rerun in a subclass
numberOfAfterInvalidates = 0;
numberOfAfterCreates = 0;
numberOfAfterUpdates = 0;
getLonerSystem();
// create the region
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
configureConnectionPool(factory, host0, new int[] {vm0Port, vm1Port}, true, -1,
-1, null);
createRegion(name, factory);
});
});
// Initialize each client with entries (so that afterInvalidate is called)
Stream.of(vm2, vm3).forEach(vm -> {
logger.info("before initialize client");
vm.invoke("Initialize Client", () -> {
numberOfAfterInvalidates = 0;
numberOfAfterCreates = 0;
numberOfAfterUpdates = 0;
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < numberOfKeys; i++) {
assertThat("key-" + i).isEqualTo(region.get("key-" + i));
}
});
});
// Add a CacheListener to both vm2 and vm3
vm2.invoke("Add CacheListener 1", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
CacheListener<Object, Object> listener = new CacheListenerAdapter<Object, Object>() {
@Override
public void afterCreate(EntryEvent e) {
numberOfAfterCreates++;
logger.info("vm2 numberOfAfterCreates: " + numberOfAfterCreates);
}
@Override
public void afterUpdate(EntryEvent e) {
numberOfAfterUpdates++;
logger.info("vm2 numberOfAfterUpdates: " + numberOfAfterUpdates);
}
@Override
public void afterInvalidate(EntryEvent e) {
numberOfAfterInvalidates++;
logger.info("vm2 numberOfAfterInvalidates: " + numberOfAfterInvalidates);
}
};
region.getAttributesMutator().addCacheListener(listener);
region.registerInterestRegex(".*", false, false);
});
vm3.invoke("Add CacheListener 2", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
CacheListener<Object, Object> listener = new CacheListenerAdapter<Object, Object>() {
@Override
public void afterCreate(EntryEvent e) {
numberOfAfterCreates++;
}
@Override
public void afterUpdate(EntryEvent e) {
numberOfAfterUpdates++;
}
@Override
public void afterInvalidate(EntryEvent e) {
numberOfAfterInvalidates++;
}
};
region.getAttributesMutator().addCacheListener(listener);
region.registerInterestRegex(".*", false, false);
});
vm2.invoke(() -> await().untilAsserted(() -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < numberOfKeys; i++) {
assertThat("key-" + i).isEqualTo(region.get("key-" + i));
}
}));
// Use vm2 to put new values
// This should cause 10 afterUpdates to vm2 and 10 afterInvalidates to vm3
vm2.invoke("Put New Values", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.put("key-" + i, "key-" + i);
}
});
vm2.invoke(() -> {
await("VM2 should not have received any afterCreate messages")
.until(() -> ConnectionPoolDUnitTest.getNumberOfAfterCreates() == 0);
await("VM2 should not have received any afterInvalidate messages")
.until(() -> ConnectionPoolDUnitTest.getNumberOfAfterInvalidates() == 0);
});
long vm2AfterUpdates = vm2.invoke(ConnectionPoolDUnitTest::getNumberOfAfterUpdates);
long vm3AfterInvalidates = vm3.invoke(ConnectionPoolDUnitTest::getNumberOfAfterInvalidates);
assertThat(vm2AfterUpdates)
.describedAs("VM2 received " + vm2AfterUpdates
+ " afterUpdate messages. It should have received " + numberOfKeys)
.isEqualTo(numberOfKeys);
assertThat(vm3AfterInvalidates)
.describedAs("VM3 received " + vm3AfterInvalidates
+ " afterInvalidate messages. It should have received " + numberOfKeys)
.isEqualTo(numberOfKeys);
}
static class DelayListener extends CacheListenerAdapter<Object, Object> {
private final int delay;
DelayListener() {
this.delay = 25;
}
private void delay() {
try {
Thread.sleep(this.delay);
} catch (InterruptedException ignore) {
fail("interrupted");
}
}
@Override
public void afterCreate(EntryEvent event) {
delay();
}
@Override
public void afterDestroy(EntryEvent event) {
delay();
}
@Override
public void afterInvalidate(EntryEvent event) {
delay();
}
@Override
public void afterRegionDestroy(RegionEvent event) {
delay();
}
@Override
public void afterRegionCreate(RegionEvent event) {
delay();
}
@Override
public void afterRegionInvalidate(RegionEvent event) {
delay();
}
@Override
public void afterUpdate(EntryEvent event) {
delay();
}
@Override
public void afterRegionClear(RegionEvent event) {
delay();
}
@Override
public void afterRegionLive(RegionEvent event) {
delay();
}
}
/**
* Make sure a tx done in a server on an empty region gets sent to clients who have registered
* interest.
*/
@Test
public void test037Bug39526part1() throws CacheException {
final String name = this.getName();
// Create the cache servers with distributed, empty region
logger.info("before create server");
vm0.invoke("Create Cache Server", () -> {
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.EMPTY);
factory.setConcurrencyChecksEnabled(false);
createRegion(name, factory);
startBridgeServer(0);
});
// Create cache server client
final String host0 = NetworkUtils.getServerHostName();
final int vm0Port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
logger.info("before create client");
vm1.invoke("Create Cache Server Client", () -> {
getLonerSystem();
// create the region
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
configureConnectionPool(factory, host0, new int[] {vm0Port}, true, -1, -1,
null);
createRegion(name, factory);
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.registerInterestRegex(".*");
});
// now do a tx in the server
logger.info("before doServerTx");
vm0.invoke("doServerTx", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
Cache cache1 = getCache();
CacheTransactionManager txMgr = cache1.getCacheTransactionManager();
txMgr.begin();
try {
region.put("k1", "v1");
region.put("k2", "v2");
region.put("k3", "v3");
} finally {
txMgr.commit();
}
});
// now verify that the client receives the committed data
logger.info("before confirmCommitOnClient");
vm1.invoke("Validate Cache Server Client", () -> {
final Region<Object, Object> region = getRootRegion().getSubregion(name);
// wait for a while for us to have the correct number of entries
await().alias("waiting for region to be size 3").until(() -> region.size() == 3);
assertThat(region.containsKey("k1")).isTrue();
assertThat(region.containsKey("k2")).isTrue();
assertThat(region.containsKey("k3")).isTrue();
assertThat("v1").isEqualTo(region.getEntry("k1").getValue());
assertThat("v2").isEqualTo(region.getEntry("k2").getValue());
assertThat("v3").isEqualTo(region.getEntry("k3").getValue());
});
}
/**
* Now confirm that a tx done in a peer of a server (the server having an empty region and wanting
* all events) sends the tx to its clients
*/
@Test
public void test038Bug39526part2() throws CacheException {
disconnectAllFromDS();
final String name = this.getName();
// Create the cache servers with distributed, empty region
logger.info("before create server");
vm0.invoke("Create Cache Server", () -> {
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setConcurrencyChecksEnabled(false);
factory.setDataPolicy(DataPolicy.EMPTY);
factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
createRegion(name, factory);
startBridgeServer(0);
});
// Create cache server client
final String host0 = NetworkUtils.getServerHostName();
final int vm0Port = vm0.invoke(ConnectionPoolDUnitTest::getCacheServerPort);
logger.info("before create client");
vm1.invoke("Create Cache Server Client", () -> {
getLonerSystem();
// create the region
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
configureConnectionPool(factory, host0, new int[] {vm0Port}, true, -1, -1,
null);
createRegion(name, factory);
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.registerInterestRegex(".*");
});
logger.info("before create server peer");
vm2.invoke("Create Server Peer", () -> {
RegionFactory<Object, Object> factory = getCache().createRegionFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.EMPTY);
factory.setConcurrencyChecksEnabled(false);
createRegion(name, factory);
});
// now do a tx in the server
logger.info("before doServerTx");
vm2.invoke("doServerTx", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
Cache cache1 = getCache();
CacheTransactionManager txmgr = cache1.getCacheTransactionManager();
txmgr.begin();
try {
region.put("k1", "v1");
region.put("k2", "v2");
region.put("k3", "v3");
} finally {
txmgr.commit();
}
});
// now verify that the client receives the committed data
logger.info("before confirmCommitOnClient");
vm1.invoke("Validate Cache Server Client", () -> {
final Region<Object, Object> region = getRootRegion().getSubregion(name);
// wait for a while for us to have the correct number of entries
await().alias("waiting for region to be size 3").until(() -> region.size() == 3);
assertThat(region.containsKey("k1")).isTrue();
assertThat(region.containsKey("k2")).isTrue();
assertThat(region.containsKey("k3")).isTrue();
assertThat("v1").isEqualTo(region.getEntry("k1").getValue());
assertThat("v2").isEqualTo(region.getEntry("k2").getValue());
assertThat("v3").isEqualTo(region.getEntry("k3").getValue());
});
disconnectAllFromDS();
}
@SuppressWarnings("WeakerAccess")
static class Order implements DataSerializable {
int index;
public Order() {}
public void init(int index) {
this.index = index;
}
public int getIndex() {
return index;
}
@Override
public void toData(DataOutput out) throws IOException {
out.writeInt(index);
}
@Override
public void fromData(DataInput in) throws IOException {
index = in.readInt();
}
}
}