blob: e9b795a633c1dd8de2e6b0f8a37e23f1dce065ec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache;
import static org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier.getInstance;
import static org.apache.geode.internal.logging.LogWriterLevel.ALL;
import static org.apache.geode.test.dunit.Assert.assertEquals;
import static org.apache.geode.test.dunit.Assert.assertFalse;
import static org.apache.geode.test.dunit.Assert.assertNotNull;
import static org.apache.geode.test.dunit.Assert.assertNull;
import static org.apache.geode.test.dunit.Assert.assertTrue;
import static org.apache.geode.test.dunit.Assert.fail;
import static org.junit.runners.MethodSorters.NAME_ASCENDING;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.junit.FixMethodOrder;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializable;
import org.apache.geode.LogWriter;
import org.apache.geode.cache.client.NoAvailableServersException;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.client.internal.Endpoint;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.cache30.CertifiableTestCacheListener;
import org.apache.geode.cache30.ClientServerTestCase;
import org.apache.geode.cache30.TestCacheLoader;
import org.apache.geode.cache30.TestCacheWriter;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.CacheServerImpl;
import org.apache.geode.internal.cache.EntryExpiryTask;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PoolStats;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifierStats;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.logging.LocalLogWriter;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.junit.categories.ClientServerTest;
/**
* This class tests the client connection pool in GemFire. It does so by creating a cache server
* with a cache and a pre-defined region and a data loader. The client creates the same region with
* a pool (this happens in the controller VM). the client then spins up 10 different threads and
* issues gets on keys. The server data loader returns the data to the client.
*
* Test uses Groboutils TestRunnable objects to achieve multi threading behavior in the test.
*/
@Category({ClientServerTest.class})
@FixMethodOrder(NAME_ASCENDING)
public class ConnectionPoolDUnitTest extends JUnit4CacheTestCase {
/** The port on which the cache server was started in this VM */
private static int bridgeServerPort;
protected static int port = 0;
protected static int port2 = 0;
protected static int numberOfAfterInvalidates;
protected static int numberOfAfterCreates;
protected static int numberOfAfterUpdates;
protected static final int TYPE_CREATE = 0;
protected static final int TYPE_UPDATE = 1;
protected static final int TYPE_INVALIDATE = 2;
protected static final int TYPE_DESTROY = 3;
@Override
public final void postSetUp() throws Exception {
// avoid IllegalStateException from HandShake by connecting all vms to
// system before creating pool
getSystem();
Invoke.invokeInEveryVM(new SerializableRunnable("getSystem") {
@Override
public void run() {
getSystem();
}
});
postSetUpConnectionPoolDUnitTest();
}
protected void postSetUpConnectionPoolDUnitTest() throws Exception {}
@Override
public final void postTearDownCacheTestCase() throws Exception {
Invoke.invokeInEveryVM(new SerializableRunnable() {
@Override
public void run() {
Map pools = PoolManager.getAll();
if (!pools.isEmpty()) {
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
.warning("found pools remaining after teardown: " + pools);
assertEquals(0, pools.size());
}
}
});
postTearDownConnectionPoolDUnitTest();
}
protected void postTearDownConnectionPoolDUnitTest() throws Exception {}
protected/* GemStoneAddition */ static PoolImpl getPool(Region r) {
PoolImpl result = null;
String poolName = r.getAttributes().getPoolName();
if (poolName != null) {
result = (PoolImpl) PoolManager.find(poolName);
}
return result;
}
protected static TestCacheWriter getTestWriter(Region r) {
return (TestCacheWriter) r.getAttributes().getCacheWriter();
}
/**
* Create a cache server on the given port without starting it.
*
* @since GemFire 5.0.2
*/
protected void createBridgeServer(int port) throws IOException {
CacheServer bridge = getCache().addCacheServer();
bridge.setPort(port);
bridge.setMaxThreads(getMaxThreads());
bridgeServerPort = bridge.getPort();
}
/**
* Starts a cache server on the given port, using the given deserializeValues and
* notifyBySubscription to serve up the given region.
*
* @since GemFire 4.0
*/
protected void startBridgeServer(int port) throws IOException {
startBridgeServer(port, -1);
}
protected void startBridgeServer(int port, int socketBufferSize) throws IOException {
startBridgeServer(port, socketBufferSize, CacheServer.DEFAULT_LOAD_POLL_INTERVAL);
}
protected void startBridgeServer(int port, int socketBufferSize, long loadPollInterval)
throws IOException {
Cache cache = getCache();
CacheServer bridge = cache.addCacheServer();
bridge.setPort(port);
if (socketBufferSize != -1) {
bridge.setSocketBufferSize(socketBufferSize);
}
bridge.setMaxThreads(getMaxThreads());
bridge.setLoadPollInterval(loadPollInterval);
bridge.start();
bridgeServerPort = bridge.getPort();
}
/**
* By default return 0 which turns off selector and gives thread per cnx. Test subclasses can
* override to run with selector.
*
* @since GemFire 5.1
*/
protected int getMaxThreads() {
return 0;
}
/**
* Stops the cache server that serves up the given cache.
*
* @since GemFire 4.0
*/
void stopBridgeServer(Cache cache) {
CacheServer bridge = cache.getCacheServers().iterator().next();
bridge.stop();
assertFalse(bridge.isRunning());
}
void stopBridgeServers(Cache cache) {
CacheServer bridge = null;
for (Iterator bsI = cache.getCacheServers().iterator(); bsI.hasNext();) {
bridge = (CacheServer) bsI.next();
bridge.stop();
assertFalse(bridge.isRunning());
}
}
private void restartBridgeServers(Cache cache) throws IOException {
CacheServer bridge = null;
for (Iterator bsI = cache.getCacheServers().iterator(); bsI.hasNext();) {
bridge = (CacheServer) bsI.next();
bridge.start();
assertTrue(bridge.isRunning());
}
}
protected InternalDistributedSystem createLonerDS() {
disconnectFromDS();
InternalDistributedSystem ds = getLonerSystem();
assertEquals(0, ds.getDistributionManager().getOtherDistributionManagerIds().size());
return ds;
}
/**
* Returns region attributes for a <code>LOCAL</code> region
*/
protected RegionAttributes getRegionAttributes() {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false); // test validation expects this behavior
return factory.create();
}
private static String createBridgeClientConnection(String host, int[] ports) {
StringBuffer sb = new StringBuffer();
for (int i = 0; i < ports.length; i++) {
if (i > 0)
sb.append(",");
sb.append("name" + i + "=");
sb.append(host + ":" + ports[i]);
}
return sb.toString();
}
private class EventWrapper {
public final EntryEvent event;
public final Object key;
public final Object val;
public final Object arg;
public final int type;
public EventWrapper(EntryEvent ee, int type) {
this.event = ee;
this.key = ee.getKey();
this.val = ee.getNewValue();
this.arg = ee.getCallbackArgument();
this.type = type;
}
public String toString() {
return "EventWrapper: event=" + event + ", type=" + type;
}
}
protected class ControlListener extends CacheListenerAdapter {
public final LinkedList events = new LinkedList();
public final Object CONTROL_LOCK = new Object();
public boolean waitWhileNotEnoughEvents(long sleepMs, int eventCount) {
long maxMillis = System.currentTimeMillis() + sleepMs;
synchronized (this.CONTROL_LOCK) {
try {
while (this.events.size() < eventCount) {
long waitMillis = maxMillis - System.currentTimeMillis();
if (waitMillis < 10) {
break;
}
this.CONTROL_LOCK.wait(waitMillis);
}
} catch (InterruptedException abort) {
fail("interrupted");
}
return !this.events.isEmpty();
}
}
@Override
public void afterCreate(EntryEvent e) {
// System.out.println("afterCreate: " + e);
synchronized (this.CONTROL_LOCK) {
this.events.add(new EventWrapper(e, TYPE_CREATE));
this.CONTROL_LOCK.notifyAll();
}
}
@Override
public void afterUpdate(EntryEvent e) {
// System.out.println("afterUpdate: " + e);
synchronized (this.CONTROL_LOCK) {
this.events.add(new EventWrapper(e, TYPE_UPDATE));
this.CONTROL_LOCK.notifyAll();
}
}
@Override
public void afterInvalidate(EntryEvent e) {
// System.out.println("afterInvalidate: " + e);
synchronized (this.CONTROL_LOCK) {
this.events.add(new EventWrapper(e, TYPE_INVALIDATE));
this.CONTROL_LOCK.notifyAll();
}
}
@Override
public void afterDestroy(EntryEvent e) {
// System.out.println("afterDestroy: " + e);
synchronized (this.CONTROL_LOCK) {
this.events.add(new EventWrapper(e, TYPE_DESTROY));
this.CONTROL_LOCK.notifyAll();
}
}
}
/**
* Create a fake EntryEvent that returns the provided region for {@link CacheEvent#getRegion()}
* and returns {@link org.apache.geode.cache.Operation#LOCAL_LOAD_CREATE} for
* {@link CacheEvent#getOperation()}
*
* @return fake entry event
*/
protected static EntryEvent createFakeyEntryEvent(final Region r) {
return new EntryEvent() {
@Override
public Operation getOperation() {
return Operation.LOCAL_LOAD_CREATE; // fake out pool to exit early
}
@Override
public Region getRegion() {
return r;
}
@Override
public Object getKey() {
return null;
}
@Override
public Object getOldValue() {
return null;
}
@Override
public boolean isOldValueAvailable() {
return true;
}
@Override
public Object getNewValue() {
return null;
}
public boolean isLocalLoad() {
return false;
}
public boolean isNetLoad() {
return false;
}
public boolean isLoad() {
return true;
}
public boolean isNetSearch() {
return false;
}
@Override
public TransactionId getTransactionId() {
return null;
}
@Override
public Object getCallbackArgument() {
return null;
}
@Override
public boolean isCallbackArgumentAvailable() {
return true;
}
@Override
public boolean isOriginRemote() {
return false;
}
@Override
public DistributedMember getDistributedMember() {
return null;
}
public boolean isExpiration() {
return false;
}
public boolean isDistributed() {
return false;
}
public boolean isBridgeEvent() {
return hasClientOrigin();
}
@Override
public boolean hasClientOrigin() {
return false;
}
public ClientProxyMembershipID getContext() {
return null;
}
@Override
public SerializedCacheValue getSerializedOldValue() {
return null;
}
@Override
public SerializedCacheValue getSerializedNewValue() {
return null;
}
};
}
public void verifyBalanced(final PoolImpl pool, int expectedServer,
final int expectedConsPerServer) {
verifyServerCount(pool, expectedServer);
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
return balanced(pool, expectedConsPerServer);
}
@Override
public String description() {
return "expected " + expectedConsPerServer + " but endpoints=" + outOfBalanceReport(pool);
}
};
GeodeAwaitility.await().untilAsserted(ev);
assertEquals("expected " + expectedConsPerServer + " but endpoints=" + outOfBalanceReport(pool),
true, balanced(pool, expectedConsPerServer));
}
protected boolean balanced(PoolImpl pool, int expectedConsPerServer) {
Iterator it = pool.getEndpointMap().values().iterator();
while (it.hasNext()) {
Endpoint ep = (Endpoint) it.next();
if (ep.getStats().getConnections() != expectedConsPerServer) {
return false;
}
}
return true;
}
protected String outOfBalanceReport(PoolImpl pool) {
StringBuffer result = new StringBuffer();
Iterator it = pool.getEndpointMap().values().iterator();
result.append("<");
while (it.hasNext()) {
Endpoint ep = (Endpoint) it.next();
result.append("ep=" + ep);
result.append(" conCount=" + ep.getStats().getConnections());
if (it.hasNext()) {
result.append(", ");
}
}
result.append(">");
return result.toString();
}
public void waitForDenylistToClear(final PoolImpl pool) {
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
return pool.getDenylistedServers().size() == 0;
}
@Override
public String description() {
return null;
}
};
GeodeAwaitility.await().untilAsserted(ev);
assertEquals("unexpected denylistedServers=" + pool.getDenylistedServers(), 0,
pool.getDenylistedServers().size());
}
public void verifyServerCount(final PoolImpl pool, final int expectedCount) {
getCache().getLogger().info("verifyServerCount expects=" + expectedCount);
WaitCriterion ev = new WaitCriterion() {
String excuse;
@Override
public boolean done() {
int actual = pool.getConnectedServerCount();
if (actual == expectedCount) {
return true;
}
excuse = "Found only " + actual + " servers, expected " + expectedCount;
return false;
}
@Override
public String description() {
return excuse;
}
};
GeodeAwaitility.await().untilAsserted(ev);
}
/**
* Tests that the callback argument is sent to the server
*/
@Test
public void test001CallbackArg() throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
final Object createCallbackArg = "CREATE CALLBACK ARG";
final Object updateCallbackArg = "PUT CALLBACK ARG";
vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
CacheWriter cw = new TestCacheWriter() {
@Override
public final void beforeUpdate2(EntryEvent event) throws CacheWriterException {
Object beca = event.getCallbackArgument();
assertEquals(updateCallbackArg, beca);
}
@Override
public void beforeCreate2(EntryEvent event) throws CacheWriterException {
Object beca = event.getCallbackArgument();
assertEquals(createCallbackArg, beca);
}
};
AttributesFactory factory = getBridgeServerRegionAttributes(null, cw);
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
});
final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
SerializableRunnable create = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
ClientServerTestCase.configureConnectionPool(factory, NetworkUtils.getServerHostName(host),
port, -1, true, -1, -1, null);
createRegion(name, factory.create());
}
};
vm1.invoke(create);
vm1.invoke(new CacheSerializableRunnable("Add entries") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.create(new Integer(i), "old" + i, createCallbackArg);
}
for (int i = 0; i < 10; i++) {
region.put(new Integer(i), "new" + i, updateCallbackArg);
}
}
});
vm0.invoke(new CacheSerializableRunnable("Check cache writer") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
TestCacheWriter writer = getTestWriter(region);
assertTrue(writer.wasInvoked());
}
});
SerializableRunnable close = new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
};
vm1.invoke(close);
vm0.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
/**
* Tests that consecutive puts have the callback assigned appropriately.
*/
@Test
public void test002CallbackArg2() throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
final Object createCallbackArg = "CREATE CALLBACK ARG";
// final Object updateCallbackArg = "PUT CALLBACK ARG";
vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
CacheWriter cw = new TestCacheWriter() {
@Override
public void beforeCreate2(EntryEvent event) throws CacheWriterException {
Integer key = (Integer) event.getKey();
if (key.intValue() % 2 == 0) {
Object beca = event.getCallbackArgument();
assertEquals(createCallbackArg, beca);
} else {
Object beca = event.getCallbackArgument();
assertNull(beca);
}
}
};
AttributesFactory factory = getBridgeServerRegionAttributes(null, cw);
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
});
final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
SerializableRunnable create = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null);
createRegion(name, factory.create());
}
};
vm1.invoke(create);
vm1.invoke(new CacheSerializableRunnable("Add entries") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
region.create(new Integer(i), "old" + i, createCallbackArg);
} else {
region.create(new Integer(i), "old" + i);
}
}
}
});
SerializableRunnable close = new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
};
vm1.invoke(close);
vm0.invoke(new CacheSerializableRunnable("Check cache writer") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
TestCacheWriter writer = getTestWriter(region);
assertTrue(writer.wasInvoked());
}
});
vm0.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
/**
* Tests for bug 36684 by having two cache servers with cacheloaders that should always return a
* value and one client connected to each server reading values. If the bug exists, the clients
* will get null sometimes.
*
*/
@Test
public void test003Bug36684() throws CacheException, InterruptedException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
// Create the cache servers with distributed, mirrored region
SerializableRunnable createServer = new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
CacheLoader cl = new CacheLoader() {
@Override
public Object load(LoaderHelper helper) {
return helper.getKey();
}
@Override
public void close() {
}
};
AttributesFactory factory = getBridgeServerMirroredAckRegionAttributes(cl, null);
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
};
getSystem().getLogWriter().info("before create server");
vm0.invoke(createServer);
vm1.invoke(createServer);
// Create cache server clients
final int numberOfKeys = 1000;
final String host0 = NetworkUtils.getServerHostName(host);
final int vm0Port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final int vm1Port = vm1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
SerializableRunnable createClient =
new CacheSerializableRunnable("Create Cache Server Client") {
@Override
public void run2() throws CacheException {
// reset all static listener variables in case this is being rerun in a subclass
numberOfAfterInvalidates = 0;
numberOfAfterCreates = 0;
numberOfAfterUpdates = 0;
// create the region
getLonerSystem();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false); // test validation expects this behavior
// create bridge writer
ClientServerTestCase.configureConnectionPool(factory, host0, vm0Port, vm1Port, true, -1,
-1, null);
createRegion(name, factory.create());
}
};
getSystem().getLogWriter().info("before create client");
vm2.invoke(createClient);
vm3.invoke(createClient);
// Initialize each client with entries (so that afterInvalidate is called)
SerializableRunnable initializeClient = new CacheSerializableRunnable("Initialize Client") {
@Override
public void run2() throws CacheException {
// StringBuffer errors = new StringBuffer();
numberOfAfterInvalidates = 0;
numberOfAfterCreates = 0;
numberOfAfterUpdates = 0;
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
for (int i = 0; i < numberOfKeys; i++) {
String expected = "key-" + i;
String actual = (String) region.get("key-" + i);
assertEquals(expected, actual);
}
}
};
getSystem().getLogWriter().info("before initialize client");
AsyncInvocation inv2 = vm2.invokeAsync(initializeClient);
AsyncInvocation inv3 = vm3.invokeAsync(initializeClient);
ThreadUtils.join(inv2, 30 * 1000);
ThreadUtils.join(inv3, 30 * 1000);
if (inv2.exceptionOccurred()) {
org.apache.geode.test.dunit.Assert.fail("Error occurred in vm2", inv2.getException());
}
if (inv3.exceptionOccurred()) {
org.apache.geode.test.dunit.Assert.fail("Error occurred in vm3", inv3.getException());
}
}
/**
* Test for client connection loss with CacheLoader Exception on the server.
*/
@Test
public void test004ForCacheLoaderException() throws CacheException, InterruptedException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
// Create the cache servers with distributed, mirrored region
SerializableRunnable createServer = new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
CacheLoader cl = new CacheLoader() {
@Override
public Object load(LoaderHelper helper) {
System.out.println("### CALLING CACHE LOADER....");
throw new CacheLoaderException(
"Test for CahceLoaderException causing Client connection to disconnect.");
}
@Override
public void close() {}
};
AttributesFactory factory = getBridgeServerMirroredAckRegionAttributes(cl, null);
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
};
getSystem().getLogWriter().info("before create server");
server.invoke(createServer);
// Create cache server clients
final int numberOfKeys = 10;
final String host0 = NetworkUtils.getServerHostName(host);
final int[] port =
new int[] {server.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort())};
final String poolName = "myPool";
SerializableRunnable createClient =
new CacheSerializableRunnable("Create Cache Server Client") {
@Override
public void run2() throws CacheException {
getLonerSystem();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
ClientServerTestCase.configureConnectionPoolWithName(factory, host0, port, true, -1, -1,
null, poolName);
createRegion(name, factory.create());
}
};
getSystem().getLogWriter().info("before create client");
client.invoke(createClient);
// Initialize each client with entries (so that afterInvalidate is called)
SerializableRunnable invokeServerCacheLaoder =
new CacheSerializableRunnable("Initialize Client") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
PoolStats stats = ((PoolImpl) PoolManager.find(poolName)).getStats();
int oldConnects = stats.getConnects();
int oldDisConnects = stats.getDisConnects();
try {
for (int i = 0; i < numberOfKeys; i++) {
String actual = (String) region.get("key-" + i);
}
} catch (Exception ex) {
if (!(ex.getCause() instanceof CacheLoaderException)) {
fail(
"UnExpected Exception, expected to receive CacheLoaderException from server, instead found: "
+ ex.getCause().getClass());
}
}
int newConnects = stats.getConnects();
int newDisConnects = stats.getDisConnects();
// System.out.println("#### new connects/disconnects :" + newConnects + ":" +
// newDisConnects);
if (newConnects != oldConnects && newDisConnects != oldDisConnects) {
fail("New connection has created for Server side CacheLoaderException.");
}
}
};
getSystem().getLogWriter().info("before initialize client");
AsyncInvocation inv2 = client.invokeAsync(invokeServerCacheLaoder);
ThreadUtils.join(inv2, 30 * 1000);
SerializableRunnable stopServer = new SerializableRunnable("stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
};
server.invoke(stopServer);
}
protected void validateDS() {
List l = InternalDistributedSystem.getExistingSystems();
if (l.size() > 1) {
getSystem().getLogWriter().info("validateDS: size=" + l.size() + " isDedicatedAdminVM="
+ ClusterDistributionManager.isDedicatedAdminVM() + " l=" + l);
}
assertFalse(ClusterDistributionManager.isDedicatedAdminVM());
assertEquals(1, l.size());
}
/**
* Tests the basic operations of the {@link Pool}
*
* @since GemFire 3.5
*/
@Test
public void test006Pool() throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = Host.getHost(0).getVM(2);
vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setConcurrencyChecksEnabled(false);
factory.setCacheLoader(new CacheLoader() {
@Override
public Object load(LoaderHelper helper) {
// System.err.println("CacheServer data loader called");
return helper.getKey().toString();
}
@Override
public void close() {
}
});
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
});
final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
SerializableRunnable create = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
validateDS();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null);
createRegion(name, factory.create());
}
};
vm1.invoke(create);
vm1.invoke(new CacheSerializableRunnable("Get values") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object value = region.get(new Integer(i));
assertEquals(String.valueOf(i), value);
}
}
});
vm1.invoke(new CacheSerializableRunnable("Update values") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.put(new Integer(i), new Integer(i));
}
}
});
vm2.invoke(create);
vm2.invoke(new CacheSerializableRunnable("Validate values") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object value = region.get(new Integer(i));
assertNotNull(value);
assertTrue(value instanceof Integer);
assertEquals(i, ((Integer) value).intValue());
}
}
});
vm1.invoke(new CacheSerializableRunnable("Close Pool") {
// do some special close validation here
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
String pName = region.getAttributes().getPoolName();
PoolImpl p = (PoolImpl) PoolManager.find(pName);
assertEquals(false, p.isDestroyed());
assertEquals(1, p.getAttachCount());
try {
p.destroy();
fail("expected IllegalStateException");
} catch (IllegalStateException expected) {
}
region.localDestroyRegion();
assertEquals(false, p.isDestroyed());
assertEquals(0, p.getAttachCount());
}
});
vm0.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
/**
* Tests the BridgeServer failover (bug 31832).
*/
@Test
public void test007BridgeServerFailoverCnx1() throws CacheException {
disconnectAllFromDS();
basicTestBridgeServerFailover(1);
}
/**
* Test BridgeServer failover with connectionsPerServer set to 0
*/
@Test
public void test008BridgeServerFailoverCnx0() throws CacheException {
basicTestBridgeServerFailover(0);
}
private void basicTestBridgeServerFailover(final int cnxCount) throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
// Create two cache servers
SerializableRunnable createCacheServer = new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
};
vm0.invoke(createCacheServer);
vm1.invoke(createCacheServer);
final int port0 = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
final int port1 = vm1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
// final String host1 = getServerHostName(vm1.getHost());
// Create one bridge client in this VM
SerializableRunnable create = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
ClientServerTestCase.configureConnectionPool(factory, host0, port0, port1, true, -1,
cnxCount, null, 100);
Region region = createRegion(name, factory.create());
// force connections to form
region.put("keyInit", new Integer(0));
region.put("keyInit2", new Integer(0));
}
};
vm2.invoke(create);
// Launch async thread that puts objects into cache. This thread will execute until
// the test has ended (which is why the RegionDestroyedException and CacheClosedException
// are caught and ignored. If any other exception occurs, the test will fail. See
// the putAI.exceptionOccurred() assertion below.
AsyncInvocation putAI = vm2.invokeAsync(new CacheSerializableRunnable("Put objects") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
try {
for (int i = 0; i < 100000; i++) {
region.put("keyAI", new Integer(i));
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
fail("interrupted");
}
}
} catch (NoAvailableServersException ignore) {
/* ignore */
} catch (RegionDestroyedException e) { // will be thrown when the test ends
/* ignore */
} catch (CancelException e) { // will be thrown when the test ends
/* ignore */
}
}
});
SerializableRunnable verify1Server = new CacheSerializableRunnable("verify1Server") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
PoolImpl pool = getPool(region);
verifyServerCount(pool, 1);
}
};
SerializableRunnable verify2Servers = new CacheSerializableRunnable("verify2Servers") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
PoolImpl pool = getPool(region);
verifyServerCount(pool, 2);
}
};
vm2.invoke(verify2Servers);
SerializableRunnable stopCacheServer = new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
};
final String expected = "java.io.IOException";
final String addExpected = "<ExpectedException action=add>" + expected + "</ExpectedException>";
final String removeExpected =
"<ExpectedException action=remove>" + expected + "</ExpectedException>";
vm2.invoke(new SerializableRunnable() {
@Override
public void run() {
LogWriter bgexecLogger = new LocalLogWriter(ALL.intLevel(), System.out);
bgexecLogger.info(addExpected);
}
});
try { // make sure we removeExpected
// Bounce the non-current server (I know that VM1 contains the non-current server
// because ...
vm1.invoke(stopCacheServer);
vm2.invoke(verify1Server);
final int restartPort = port1;
vm1.invoke(new SerializableRunnable("Restart CacheServer") {
@Override
public void run() {
try {
Region region = getRootRegion().getSubregion(name);
assertNotNull(region);
startBridgeServer(restartPort);
} catch (Exception e) {
getSystem().getLogWriter().fine(new Exception(e));
org.apache.geode.test.dunit.Assert.fail("Failed to start CacheServer", e);
}
}
});
// Pause long enough for the monitor to realize the server has been bounced
// and reconnect to it.
vm2.invoke(verify2Servers);
} finally {
vm2.invoke(new SerializableRunnable() {
@Override
public void run() {
LogWriter bgexecLogger = new LocalLogWriter(ALL.intLevel(), System.out);
bgexecLogger.info(removeExpected);
}
});
}
// Stop the other cache server
vm0.invoke(stopCacheServer);
// Run awhile
vm2.invoke(verify1Server);
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
.info("FIXME: this thread does not terminate"); // FIXME
// // Verify that no exception has occurred in the putter thread
// join(putAI, 5 * 60 * 1000, getLogWriter());
// //assertTrue("Exception occurred while invoking " + putAI, !putAI.exceptionOccurred());
// if (putAI.exceptionOccurred()) {
// fail("While putting entries: ", putAI.getException());
// }
// Close Pool
vm2.invoke(new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
});
// Stop the last cache server
vm1.invoke(stopCacheServer);
}
protected static volatile boolean stopTestLifetimeExpire = false;
protected static volatile int baselineLifetimeCheck;
protected static volatile int baselineLifetimeExtensions;
protected static volatile int baselineLifetimeConnect;
protected static volatile int baselineLifetimeDisconnect;
@Test
public void basicTestLifetimeExpire() throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
AsyncInvocation putAI = null;
AsyncInvocation putAI2 = null;
try {
// Create two cache servers
SerializableRunnable createCacheServer =
new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
factory.setCacheListener(new DelayListener(25));
createRegion(name, factory.create());
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
};
vm0.invoke(createCacheServer);
final int port0 = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
vm1.invoke(createCacheServer);
final int port1 = vm1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
SerializableRunnable stopCacheServer = new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
};
// we only had to stop it to reserve a port
vm1.invoke(stopCacheServer);
// Create one bridge client in this VM
SerializableRunnable create = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
ClientServerTestCase.configureConnectionPool(factory, host0, port0, port1,
false/* queue */, -1, 0, null, 100, 500, 500);
Region region = createRegion(name, factory.create());
// force connections to form
region.put("keyInit", new Integer(0));
region.put("keyInit2", new Integer(0));
}
};
vm2.invoke(create);
// Launch async thread that puts objects into cache. This thread will execute until
// the test has ended.
SerializableRunnable putter1 = new CacheSerializableRunnable("Put objects") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
PoolImpl pool = getPool(region);
PoolStats stats = pool.getStats();
baselineLifetimeCheck = stats.getLoadConditioningCheck();
baselineLifetimeExtensions = stats.getLoadConditioningExtensions();
baselineLifetimeConnect = stats.getLoadConditioningConnect();
baselineLifetimeDisconnect = stats.getLoadConditioningDisconnect();
try {
int count = 0;
while (!stopTestLifetimeExpire) {
count++;
region.put("keyAI1", new Integer(count));
}
} catch (NoAvailableServersException ex) {
if (stopTestLifetimeExpire) {
return;
} else {
throw ex;
}
// } catch (RegionDestroyedException e) { //will be thrown when the test ends
// /*ignore*/
// } catch (CancelException e) { //will be thrown when the test ends
// /*ignore*/
}
}
};
SerializableRunnable putter2 = new CacheSerializableRunnable("Put objects") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
try {
int count = 0;
while (!stopTestLifetimeExpire) {
count++;
region.put("keyAI2", new Integer(count));
}
} catch (NoAvailableServersException ex) {
if (stopTestLifetimeExpire) {
return;
} else {
throw ex;
}
// } catch (RegionDestroyedException e) { //will be thrown when the test ends
// /*ignore*/
// } catch (CancelException e) { //will be thrown when the test ends
// /*ignore*/
}
}
};
putAI = vm2.invokeAsync(putter1);
putAI2 = vm2.invokeAsync(putter2);
SerializableRunnable verify1Server = new CacheSerializableRunnable("verify1Server") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
PoolImpl pool = getPool(region);
final PoolStats stats = pool.getStats();
verifyServerCount(pool, 1);
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
return stats.getLoadConditioningCheck() >= (10 + baselineLifetimeCheck);
}
@Override
public String description() {
return null;
}
};
GeodeAwaitility.await().untilAsserted(ev);
// make sure no replacements are happening.
// since we have 2 threads and 2 cnxs and 1 server
// when lifetimes are up we should only want to connect back to the
// server we are already connected to and thus just extend our lifetime
assertTrue(
"baselineLifetimeCheck=" + baselineLifetimeCheck
+ " but stats.getLoadConditioningCheck()=" + stats.getLoadConditioningCheck(),
stats.getLoadConditioningCheck() >= (10 + baselineLifetimeCheck));
baselineLifetimeCheck = stats.getLoadConditioningCheck();
assertTrue(stats.getLoadConditioningExtensions() > baselineLifetimeExtensions);
assertTrue(stats.getLoadConditioningConnect() == baselineLifetimeConnect);
assertTrue(stats.getLoadConditioningDisconnect() == baselineLifetimeDisconnect);
}
};
SerializableRunnable verify2Servers = new CacheSerializableRunnable("verify2Servers") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
PoolImpl pool = getPool(region);
final PoolStats stats = pool.getStats();
verifyServerCount(pool, 2);
// make sure some replacements are happening.
// since we have 2 threads and 2 cnxs and 2 servers
// when lifetimes are up we should connect to the other server sometimes.
// int retry = 300;
// while ((retry-- > 0)
// && (stats.getLoadConditioningCheck() < (10+baselineLifetimeCheck))) {
// pause(100);
// }
// assertTrue("Bug 39209 expected "
// + stats.getLoadConditioningCheck()
// + " to be >= "
// + (10+baselineLifetimeCheck),
// stats.getLoadConditioningCheck() >= (10+baselineLifetimeCheck));
// TODO: does this WaitCriterion actually help?
WaitCriterion wc = new WaitCriterion() {
String excuse;
@Override
public boolean done() {
int actual = stats.getLoadConditioningCheck();
int expected = 10 + baselineLifetimeCheck;
if (actual >= expected) {
return true;
}
excuse = "Bug 39209 expected " + actual + " to be >= " + expected;
return false;
}
@Override
public String description() {
return excuse;
}
};
GeodeAwaitility.await().untilAsserted(wc);
assertTrue(stats.getLoadConditioningConnect() > baselineLifetimeConnect);
assertTrue(stats.getLoadConditioningDisconnect() > baselineLifetimeDisconnect);
}
};
vm2.invoke(verify1Server);
assertEquals(true, putAI.isAlive());
assertEquals(true, putAI2.isAlive());
} finally {
vm2.invoke(new SerializableRunnable("Stop Putters") {
@Override
public void run() {
stopTestLifetimeExpire = true;
}
});
try {
if (putAI != null) {
// Verify that no exception has occurred in the putter thread
ThreadUtils.join(putAI, 30 * 1000);
if (putAI.exceptionOccurred()) {
org.apache.geode.test.dunit.Assert.fail("While putting entries: ",
putAI.getException());
}
}
if (putAI2 != null) {
// Verify that no exception has occurred in the putter thread
ThreadUtils.join(putAI, 30 * 1000);
// FIXME this thread does not terminate
// if (putAI2.exceptionOccurred()) {
// fail("While putting entries: ", putAI.getException());
// }
}
} finally {
vm2.invoke(new SerializableRunnable("Stop Putters") {
@Override
public void run() {
stopTestLifetimeExpire = false;
}
});
// Close Pool
vm2.invoke(new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
String poolName = region.getAttributes().getPoolName();
region.localDestroyRegion();
PoolManager.find(poolName).destroy();
}
});
SerializableRunnable stopCacheServer = new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
};
vm1.invoke(stopCacheServer);
vm0.invoke(stopCacheServer);
}
}
}
/**
* Tests the create operation of the {@link Pool}
*
* @since GemFire 3.5
*/
@Test
public void test011PoolCreate() throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = Host.getHost(0).getVM(2);
vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
});
final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
SerializableRunnable create = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, false, -1, -1, null);
createRegion(name, factory.create());
}
};
vm1.invoke(create);
vm1.invoke(new CacheSerializableRunnable("Create values") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.create(new Integer(i), new Integer(i));
}
}
});
vm2.invoke(create);
vm2.invoke(new CacheSerializableRunnable("Validate values") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object value = region.get(new Integer(i));
assertNotNull(value);
assertTrue(value instanceof Integer);
assertEquals(i, ((Integer) value).intValue());
}
}
});
SerializableRunnable close = new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
};
vm1.invoke(close);
vm2.invoke(close);
vm0.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
/**
* Tests the put operation of the {@link Pool}
*
* @since GemFire 3.5
*/
@Test
public void test012PoolPut() throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = Host.getHost(0).getVM(2);
vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
});
final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
SerializableRunnable createPool = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, false, -1, -1, null);
createRegion(name, factory.create());
}
};
vm1.invoke(createPool);
vm1.invoke(new CacheSerializableRunnable("Put values") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
// put string values
region.put("key-string-" + i, "value-" + i);
// put object values
Order order = new Order();
order.init(i);
region.put("key-object-" + i, order);
// put byte[] values
region.put("key-bytes-" + i, ("value-" + i).getBytes());
}
}
});
vm2.invoke(createPool);
vm2.invoke(new CacheSerializableRunnable("Get / validate string values") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object value = region.get("key-string-" + i);
assertNotNull(value);
assertTrue(value instanceof String);
assertEquals("value-" + i, value);
}
}
});
vm2.invoke(new CacheSerializableRunnable("Get / validate object values") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object value = region.get("key-object-" + i);
assertNotNull(value);
assertTrue(value instanceof Order);
assertEquals(i, ((Order) value).getIndex());
}
}
});
vm2.invoke(new CacheSerializableRunnable("Get / validate byte[] values") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object value = region.get("key-bytes-" + i);
assertNotNull(value);
assertTrue(value instanceof byte[]);
assertEquals("value-" + i, new String((byte[]) value));
}
}
});
SerializableRunnable closePool = new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
};
vm1.invoke(closePool);
vm2.invoke(closePool);
vm0.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
/**
* Tests the put operation of the {@link Pool}
*
* @since GemFire 3.5
*/
@Test
public void test013PoolPutNoDeserialize() throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = Host.getHost(0).getVM(2);
vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
});
final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
SerializableRunnable createPool = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, false, -1, -1, null);
createRegion(name, factory.create());
}
};
vm1.invoke(createPool);
vm1.invoke(new CacheSerializableRunnable("Put values") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
// put string values
region.put("key-string-" + i, "value-" + i);
// put object values
Order order = new Order();
order.init(i);
region.put("key-object-" + i, order);
// put byte[] values
region.put("key-bytes-" + i, ("value-" + i).getBytes());
}
}
});
vm2.invoke(createPool);
vm2.invoke(new CacheSerializableRunnable("Get / validate string values") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object value = region.get("key-string-" + i);
assertNotNull(value);
assertTrue(value instanceof String);
assertEquals("value-" + i, value);
}
}
});
vm2.invoke(new CacheSerializableRunnable("Get / validate object values") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object value = region.get("key-object-" + i);
assertNotNull(value);
assertTrue(value instanceof Order);
assertEquals(i, ((Order) value).getIndex());
}
}
});
vm2.invoke(new CacheSerializableRunnable("Get / validate byte[] values") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object value = region.get("key-bytes-" + i);
assertNotNull(value);
assertTrue(value instanceof byte[]);
assertEquals("value-" + i, new String((byte[]) value));
}
}
});
SerializableRunnable closePool = new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
};
vm1.invoke(closePool);
vm2.invoke(closePool);
vm0.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
Wait.pause(5 * 1000);
}
/**
* Tests that invalidates and destroys are propagated to {@link Pool}s.
*
* @since GemFire 3.5
*/
@Test
public void test014InvalidateAndDestroyPropagation() throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
});
final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
SerializableRunnable create = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null);
CertifiableTestCacheListener l = new CertifiableTestCacheListener(
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter());
factory.setCacheListener(l);
Region rgn = createRegion(name, factory.create());
rgn.registerInterestRegex(".*", false, false);
}
};
vm1.invoke(create);
vm1.invoke(new CacheSerializableRunnable("Populate region") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.put(new Integer(i), "old" + i);
}
}
});
vm2.invoke(create);
Wait.pause(5 * 1000);
vm1.invoke(new CacheSerializableRunnable("Turn on history") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener ctl =
(CertifiableTestCacheListener) region.getAttributes().getCacheListener();
ctl.enableEventHistory();
}
});
vm2.invoke(new CacheSerializableRunnable("Update region") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.put(new Integer(i), "new" + i, "callbackArg" + i);
}
}
});
Wait.pause(5 * 1000);
vm1.invoke(new CacheSerializableRunnable("Verify invalidates") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener ctl =
(CertifiableTestCacheListener) region.getAttributes().getCacheListener();
for (int i = 0; i < 10; i++) {
Object key = new Integer(i);
ctl.waitForInvalidated(key);
Region.Entry entry = region.getEntry(key);
assertNotNull(entry);
assertNull(entry.getValue());
}
{
List l = ctl.getEventHistory();
assertEquals(10, l.size());
for (int i = 0; i < 10; i++) {
Object key = new Integer(i);
EntryEvent ee = (EntryEvent) l.get(i);
assertEquals(key, ee.getKey());
assertEquals("old" + i, ee.getOldValue());
assertEquals(Operation.INVALIDATE, ee.getOperation());
assertEquals("callbackArg" + i, ee.getCallbackArgument());
assertEquals(true, ee.isOriginRemote());
}
}
}
});
vm2.invoke(new CacheSerializableRunnable("Validate original and destroy") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object key = new Integer(i);
assertEquals("new" + i, region.getEntry(key).getValue());
region.destroy(key, "destroyCB" + i);
}
}
});
Wait.pause(5 * 1000);
vm1.invoke(new CacheSerializableRunnable("Verify destroys") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener ctl =
(CertifiableTestCacheListener) region.getAttributes().getCacheListener();
for (int i = 0; i < 10; i++) {
Object key = new Integer(i);
ctl.waitForDestroyed(key);
Region.Entry entry = region.getEntry(key);
assertNull(entry);
}
{
List l = ctl.getEventHistory();
assertEquals(10, l.size());
for (int i = 0; i < 10; i++) {
Object key = new Integer(i);
EntryEvent ee = (EntryEvent) l.get(i);
assertEquals(key, ee.getKey());
assertEquals(null, ee.getOldValue());
assertEquals(Operation.DESTROY, ee.getOperation());
assertEquals("destroyCB" + i, ee.getCallbackArgument());
assertEquals(true, ee.isOriginRemote());
}
}
}
});
vm2.invoke(new CacheSerializableRunnable("recreate") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object key = new Integer(i);
region.create(key, "create" + i);
}
}
});
Wait.pause(5 * 1000);
vm1.invoke(new CacheSerializableRunnable("Verify creates") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener ctl =
(CertifiableTestCacheListener) region.getAttributes().getCacheListener();
List l = ctl.getEventHistory();
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
.info("history (should be empty): " + l);
assertEquals(0, l.size());
// now see if we can get it from the server
for (int i = 0; i < 10; i++) {
Object key = new Integer(i);
assertEquals("create" + i, region.get(key, "loadCB" + i));
}
l = ctl.getEventHistory();
assertEquals(10, l.size());
for (int i = 0; i < 10; i++) {
Object key = new Integer(i);
EntryEvent ee = (EntryEvent) l.get(i);
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("processing " + ee);
assertEquals(key, ee.getKey());
assertEquals(null, ee.getOldValue());
assertEquals("create" + i, ee.getNewValue());
assertEquals(Operation.LOCAL_LOAD_CREATE, ee.getOperation());
assertEquals("loadCB" + i, ee.getCallbackArgument());
assertEquals(false, ee.isOriginRemote());
}
}
});
SerializableRunnable close = new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
};
vm1.invoke(close);
vm2.invoke(close);
vm0.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
/**
* Tests that invalidates and destroys are propagated to {@link Pool}s correctly to
* DataPolicy.EMPTY + InterestPolicy.ALL
*
* @since GemFire 5.0
*/
@Test
public void test015InvalidateAndDestroyToEmptyAllPropagation() throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
});
final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
SerializableRunnable createEmpty = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null);
CertifiableTestCacheListener l = new CertifiableTestCacheListener(
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter());
factory.setCacheListener(l);
factory.setDataPolicy(DataPolicy.EMPTY);
factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
Region rgn = createRegion(name, factory.create());
rgn.registerInterestRegex(".*", false, false);
}
};
SerializableRunnable createNormal = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null);
CertifiableTestCacheListener l = new CertifiableTestCacheListener(
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter());
factory.setCacheListener(l);
Region rgn = createRegion(name, factory.create());
rgn.registerInterestRegex(".*", false, false);
}
};
vm1.invoke(createEmpty);
vm1.invoke(new CacheSerializableRunnable("Populate region") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.put(new Integer(i), "old" + i);
}
}
});
vm2.invoke(createNormal);
vm1.invoke(new CacheSerializableRunnable("Turn on history") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener ctl =
(CertifiableTestCacheListener) region.getAttributes().getCacheListener();
ctl.enableEventHistory();
}
});
vm2.invoke(new CacheSerializableRunnable("Update region") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.put(new Integer(i), "new" + i, "callbackArg" + i);
}
}
});
Wait.pause(5 * 1000);
vm1.invoke(new CacheSerializableRunnable("Verify invalidates") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener ctl =
(CertifiableTestCacheListener) region.getAttributes().getCacheListener();
for (int i = 0; i < 10; i++) {
Object key = new Integer(i);
ctl.waitForInvalidated(key);
Region.Entry entry = region.getEntry(key);
assertNull(entry); // we are empty!
}
{
List l = ctl.getEventHistory();
assertEquals(10, l.size());
for (int i = 0; i < 10; i++) {
Object key = new Integer(i);
EntryEvent ee = (EntryEvent) l.get(i);
assertEquals(key, ee.getKey());
assertEquals(null, ee.getOldValue());
assertEquals(false, ee.isOldValueAvailable()); // failure
assertEquals(Operation.INVALIDATE, ee.getOperation());
assertEquals("callbackArg" + i, ee.getCallbackArgument());
assertEquals(true, ee.isOriginRemote());
}
}
}
});
vm2.invoke(new CacheSerializableRunnable("Validate original and destroy") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object key = new Integer(i);
assertEquals("new" + i, region.getEntry(key).getValue());
region.destroy(key, "destroyCB" + i);
}
}
});
Wait.pause(5 * 1000);
vm1.invoke(new CacheSerializableRunnable("Verify destroys") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener ctl =
(CertifiableTestCacheListener) region.getAttributes().getCacheListener();
for (int i = 0; i < 10; i++) {
Object key = new Integer(i);
ctl.waitForDestroyed(key);
Region.Entry entry = region.getEntry(key);
assertNull(entry);
}
{
List l = ctl.getEventHistory();
assertEquals(10, l.size());
for (int i = 0; i < 10; i++) {
Object key = new Integer(i);
EntryEvent ee = (EntryEvent) l.get(i);
assertEquals(key, ee.getKey());
assertEquals(null, ee.getOldValue());
assertEquals(false, ee.isOldValueAvailable());
assertEquals(Operation.DESTROY, ee.getOperation());
assertEquals("destroyCB" + i, ee.getCallbackArgument());
assertEquals(true, ee.isOriginRemote());
}
}
}
});
vm2.invoke(new CacheSerializableRunnable("recreate") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object key = new Integer(i);
region.create(key, "create" + i, "createCB" + i);
}
}
});
Wait.pause(5 * 1000);
vm1.invoke(new CacheSerializableRunnable("Verify creates") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener ctl =
(CertifiableTestCacheListener) region.getAttributes().getCacheListener();
for (int i = 0; i < 10; i++) {
Object key = new Integer(i);
ctl.waitForInvalidated(key);
Region.Entry entry = region.getEntry(key);
assertNull(entry);
}
List l = ctl.getEventHistory();
assertEquals(10, l.size());
for (int i = 0; i < 10; i++) {
Object key = new Integer(i);
EntryEvent ee = (EntryEvent) l.get(i);
assertEquals(key, ee.getKey());
assertEquals(null, ee.getOldValue());
assertEquals(false, ee.isOldValueAvailable());
assertEquals(Operation.INVALIDATE, ee.getOperation());
assertEquals("createCB" + i, ee.getCallbackArgument());
assertEquals(true, ee.isOriginRemote());
}
// now see if we can get it from the server
for (int i = 0; i < 10; i++) {
Object key = new Integer(i);
assertEquals("create" + i, region.get(key, "loadCB" + i));
}
l = ctl.getEventHistory();
assertEquals(10, l.size());
for (int i = 0; i < 10; i++) {
Object key = new Integer(i);
EntryEvent ee = (EntryEvent) l.get(i);
assertEquals(key, ee.getKey());
assertEquals(null, ee.getOldValue());
assertEquals("create" + i, ee.getNewValue());
assertEquals(Operation.LOCAL_LOAD_CREATE, ee.getOperation());
assertEquals("loadCB" + i, ee.getCallbackArgument());
assertEquals(false, ee.isOriginRemote());
}
}
});
SerializableRunnable close = new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
};
vm1.invoke(close);
vm2.invoke(close);
vm0.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
/**
* Tests that invalidates and destroys are propagated to {@link Pool}s correctly to
* DataPolicy.EMPTY + InterestPolicy.CACHE_CONTENT
*
* @since GemFire 5.0
*/
@Test
public void test016InvalidateAndDestroyToEmptyCCPropagation() throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
createRegion(name, factory.create());
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
});
final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
SerializableRunnable createEmpty = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null);
CertifiableTestCacheListener l = new CertifiableTestCacheListener(
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter());
factory.setCacheListener(l);
factory.setDataPolicy(DataPolicy.EMPTY);
factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.CACHE_CONTENT));
Region rgn = createRegion(name, factory.create());
rgn.registerInterestRegex(".*", false, false);
}
};
SerializableRunnable createNormal = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null);
CertifiableTestCacheListener l = new CertifiableTestCacheListener(
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter());
factory.setCacheListener(l);
Region rgn = createRegion(name, factory.create());
rgn.registerInterestRegex(".*", false, false);
}
};
vm1.invoke(createEmpty);
vm1.invoke(new CacheSerializableRunnable("Populate region") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.put(new Integer(i), "old" + i);
}
}
});
vm2.invoke(createNormal);
vm1.invoke(new CacheSerializableRunnable("Turn on history") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener ctl =
(CertifiableTestCacheListener) region.getAttributes().getCacheListener();
ctl.enableEventHistory();
}
});
vm2.invoke(new CacheSerializableRunnable("Update region") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.put(new Integer(i), "new" + i, "callbackArg" + i);
}
}
});
Wait.pause(5 * 1000);
vm1.invoke(new CacheSerializableRunnable("Verify invalidates") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener ctl =
(CertifiableTestCacheListener) region.getAttributes().getCacheListener();
List l = ctl.getEventHistory();
assertEquals(0, l.size());
}
});
vm2.invoke(new CacheSerializableRunnable("Validate original and destroy") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object key = new Integer(i);
assertEquals("new" + i, region.getEntry(key).getValue());
region.destroy(key, "destroyCB" + i);
}
}
});
vm1.invoke(new CacheSerializableRunnable("Verify destroys") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener ctl =
(CertifiableTestCacheListener) region.getAttributes().getCacheListener();
List l = ctl.getEventHistory();
assertEquals(0, l.size());
}
});
vm2.invoke(new CacheSerializableRunnable("recreate") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Object key = new Integer(i);
region.create(key, "create" + i, "createCB" + i);
}
}
});
Wait.pause(5 * 1000);
vm1.invoke(new CacheSerializableRunnable("Verify creates") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener ctl =
(CertifiableTestCacheListener) region.getAttributes().getCacheListener();
List l = ctl.getEventHistory();
assertEquals(0, l.size());
// now see if we can get it from the server
for (int i = 0; i < 10; i++) {
Object key = new Integer(i);
assertEquals("create" + i, region.get(key, "loadCB" + i));
}
l = ctl.getEventHistory();
assertEquals(10, l.size());
for (int i = 0; i < 10; i++) {
Object key = new Integer(i);
EntryEvent ee = (EntryEvent) l.get(i);
assertEquals(key, ee.getKey());
assertEquals(null, ee.getOldValue());
assertEquals("create" + i, ee.getNewValue());
assertEquals(Operation.LOCAL_LOAD_CREATE, ee.getOperation());
assertEquals("loadCB" + i, ee.getCallbackArgument());
assertEquals(false, ee.isOriginRemote());
}
}
});
SerializableRunnable close = new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
};
vm1.invoke(close);
vm2.invoke(close);
vm0.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
/**
* Tests interest key registration.
*/
@Test
public void test017ExpireDestroyHasEntryInCallback() throws CacheException {
disconnectAllFromDS();
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
// Create cache server
vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
// In lieu of System.setProperty("gemfire.EXPIRE_SENDS_ENTRY_AS_CALLBACK", "true");
EntryExpiryTask.expireSendsEntryAsCallback = true;
AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
factory.setEntryTimeToLive(new ExpirationAttributes(1, ExpirationAction.DESTROY));
createRegion(name, factory.create());
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
});
// Create cache server clients
final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
SerializableRunnable createClient = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setDataPolicy(DataPolicy.EMPTY);
factory.setSubscriptionAttributes(new SubscriptionAttributes((InterestPolicy.ALL)));
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null);
CertifiableTestCacheListener l = new CertifiableTestCacheListener(
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter());
factory.setCacheListener(l);
Region r = createRegion(name, factory.create());
r.registerInterest("ALL_KEYS");
}
};
vm1.invoke(createClient);
vm1.invoke(new CacheSerializableRunnable("Turn on history") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener ctl =
(CertifiableTestCacheListener) region.getAttributes().getCacheListener();
ctl.enableEventHistory();
}
});
Wait.pause(500);
// Create some entries on the client
vm1.invoke(new CacheSerializableRunnable("Create entries") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 5; i++) {
region.put("key-client-" + i, "value-client-" + i);
}
}
});
// Create some entries on the server
vm0.invoke(new CacheSerializableRunnable("Create entries") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 5; i++) {
region.put("key-server-" + i, "value-server-" + i);
}
}
});
// Wait for expiration
Wait.pause(2000);
vm1.invoke(new CacheSerializableRunnable("Validate listener events") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
CertifiableTestCacheListener ctl =
(CertifiableTestCacheListener) region.getAttributes().getCacheListener();
int destroyCallbacks = 0;
List<CacheEvent> l = ctl.getEventHistory();
for (CacheEvent ce : l) {
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("--->>> " + ce);
if (ce.getOperation() == Operation.DESTROY
&& ce.getCallbackArgument() instanceof String) {
destroyCallbacks++;
}
}
assertEquals(10, destroyCallbacks);
}
});
// Close cache server clients
SerializableRunnable close = new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
};
vm1.invoke(close);
// Stop cache server
vm0.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
public AttributesFactory getBridgeServerRegionAttributes(CacheLoader cl, CacheWriter cw) {
AttributesFactory ret = new AttributesFactory();
if (cl != null) {
ret.setCacheLoader(cl);
}
if (cw != null) {
ret.setCacheWriter(cw);
}
ret.setScope(Scope.DISTRIBUTED_ACK);
ret.setConcurrencyChecksEnabled(false);
return ret;
}
public AttributesFactory getBridgeServerMirroredRegionAttributes(CacheLoader cl, CacheWriter cw) {
AttributesFactory ret = new AttributesFactory();
if (cl != null) {
ret.setCacheLoader(cl);
}
if (cw != null) {
ret.setCacheWriter(cw);
}
ret.setScope(Scope.DISTRIBUTED_NO_ACK);
ret.setDataPolicy(DataPolicy.REPLICATE);
ret.setConcurrencyChecksEnabled(false);
return ret;
}
public AttributesFactory getBridgeServerMirroredAckRegionAttributes(CacheLoader cl,
CacheWriter cw) {
AttributesFactory ret = new AttributesFactory();
if (cl != null) {
ret.setCacheLoader(cl);
}
if (cw != null) {
ret.setCacheWriter(cw);
}
ret.setScope(Scope.DISTRIBUTED_ACK);
ret.setConcurrencyChecksEnabled(false);
ret.setMirrorType(MirrorType.KEYS_VALUES);
return ret;
}
/**
* Tests that updates are not sent to VMs that did not ask for them.
*/
@Test
public void test018OnlyRequestedUpdates() throws Exception {
final String name1 = this.getName() + "-1";
final String name2 = this.getName() + "-2";
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
// Cache server serves up both regions
vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
createRegion(name1, factory.create());
createRegion(name2, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
});
final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
// vm1 sends updates to the server
vm1.invoke(new CacheSerializableRunnable("Create regions") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null);
Region rgn = createRegion(name1, factory.create());
rgn.registerInterestRegex(".*", false, false);
rgn = createRegion(name2, factory.create());
rgn.registerInterestRegex(".*", false, false);
}
});
// vm2 only wants updates to updates to region1
vm2.invoke(new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null);
Region rgn = createRegion(name1, factory.create());
rgn.registerInterestRegex(".*", false, false);
createRegion(name2, factory.create());
// no interest registration for region 2
}
});
SerializableRunnable populate = new CacheSerializableRunnable("Populate region") {
@Override
public void run2() throws CacheException {
Region region1 = getRootRegion().getSubregion(name1);
for (int i = 0; i < 10; i++) {
region1.put(new Integer(i), "Region1Old" + i);
}
Region region2 = getRootRegion().getSubregion(name2);
for (int i = 0; i < 10; i++) {
region2.put(new Integer(i), "Region2Old" + i);
}
}
};
vm1.invoke(populate);
vm2.invoke(populate);
vm1.invoke(new CacheSerializableRunnable("Update") {
@Override
public void run2() throws CacheException {
Region region1 = getRootRegion().getSubregion(name1);
for (int i = 0; i < 10; i++) {
region1.put(new Integer(i), "Region1New" + i);
}
Region region2 = getRootRegion().getSubregion(name2);
for (int i = 0; i < 10; i++) {
region2.put(new Integer(i), "Region2New" + i);
}
}
});
// Wait for updates to be propagated
Wait.pause(5 * 1000);
vm2.invoke(new CacheSerializableRunnable("Validate") {
@Override
public void run2() throws CacheException {
Region region1 = getRootRegion().getSubregion(name1);
for (int i = 0; i < 10; i++) {
assertEquals("Region1New" + i, region1.get(new Integer(i)));
}
Region region2 = getRootRegion().getSubregion(name2);
for (int i = 0; i < 10; i++) {
assertEquals("Region2Old" + i, region2.get(new Integer(i)));
}
}
});
vm1.invoke(new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
// Terminate region1's Pool
Region region1 = getRootRegion().getSubregion(name1);
region1.localDestroyRegion();
// Terminate region2's Pool
Region region2 = getRootRegion().getSubregion(name2);
region2.localDestroyRegion();
}
});
vm2.invoke(new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
// Terminate region1's Pool
Region region1 = getRootRegion().getSubregion(name1);
region1.localDestroyRegion();
}
});
vm0.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
/**
* Tests interest key registration.
*/
@Test
public void test019InterestKeyRegistration() throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
// Create cache server
vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
CacheLoader cl = new CacheLoader() {
@Override
public Object load(LoaderHelper helper) {
return helper.getKey();
}
@Override
public void close() {
}
};
AttributesFactory factory = getBridgeServerRegionAttributes(cl, null);
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
});
// Create cache server clients
final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
SerializableRunnable create = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null);
createRegion(name, factory.create());
}
};
vm1.invoke(create);
vm2.invoke(create);
// Get values for key 1 and key 2 so that there are entries in the clients.
// Register interest in one of the keys.
vm1.invoke(new CacheSerializableRunnable("Create Entries and Register Interest") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
assertEquals(region.get("key-1"), "key-1");
assertEquals(region.get("key-2"), "key-2");
try {
region.registerInterest("key-1");
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While registering interest: ", ex);
}
}
});
vm2.invoke(new CacheSerializableRunnable("Create Entries and Register Interest") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
assertEquals(region.get("key-1"), "key-1");
assertEquals(region.get("key-2"), "key-2");
try {
region.registerInterest("key-2");
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While registering interest: ", ex);
}
}
});
// Put new values and validate updates (VM1)
vm1.invoke(new CacheSerializableRunnable("Put New Values") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
region.put("key-1", "vm1-key-1");
region.put("key-2", "vm1-key-2");
// Verify that no invalidates occurred to this region
assertEquals(region.getEntry("key-1").getValue(), "vm1-key-1");
assertEquals(region.getEntry("key-2").getValue(), "vm1-key-2");
}
});
Wait.pause(500);
vm2.invoke(new CacheSerializableRunnable("Validate Entries") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
// Verify that 'key-2' was updated, but 'key-1' was not
// and contains the original value
assertEquals(region.getEntry("key-1").getValue(), "key-1");
assertEquals(region.getEntry("key-2").getValue(), "vm1-key-2");
// assertNull(region.getEntry("key-2").getValue());
}
});
// Put new values and validate updates (VM2)
vm2.invoke(new CacheSerializableRunnable("Put New Values") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
region.put("key-1", "vm2-key-1");
region.put("key-2", "vm2-key-2");
// Verify that no updates occurred to this region
assertEquals(region.getEntry("key-1").getValue(), "vm2-key-1");
assertEquals(region.getEntry("key-2").getValue(), "vm2-key-2");
}
});
Wait.pause(500);
vm1.invoke(new CacheSerializableRunnable("Validate Entries") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
// Verify that 'key-1' was updated, but 'key-2' was not
// and contains the original value
assertEquals(region.getEntry("key-2").getValue(), "vm1-key-2");
assertEquals(region.getEntry("key-1").getValue(), "vm2-key-1");
// assertNull(region.getEntry("key-1").getValue());
}
});
// Unregister interest
vm1.invoke(new CacheSerializableRunnable("Unregister Interest") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
try {
region.unregisterInterest("key-1");
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While registering interest: ", ex);
}
}
});
vm2.invoke(new CacheSerializableRunnable("Unregister Interest") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
try {
region.unregisterInterest("key-2");
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While registering interest: ", ex);
}
}
});
// Put new values and validate updates (VM1)
vm1.invoke(new CacheSerializableRunnable("Put New Values") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
region.put("key-1", "vm1-key-1-again");
region.put("key-2", "vm1-key-2-again");
// Verify that no updates occurred to this region
assertEquals(region.getEntry("key-1").getValue(), "vm1-key-1-again");
assertEquals(region.getEntry("key-2").getValue(), "vm1-key-2-again");
}
});
Wait.pause(500);
vm2.invoke(new CacheSerializableRunnable("Validate Entries") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
// Verify that neither 'key-1' 'key-2' was updated
// and contain the original value
assertEquals(region.getEntry("key-1").getValue(), "vm2-key-1");
assertEquals(region.getEntry("key-2").getValue(), "vm2-key-2");
}
});
// Put new values and validate updates (VM2)
vm2.invoke(new CacheSerializableRunnable("Put New Values") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
region.put("key-1", "vm2-key-1-again");
region.put("key-2", "vm2-key-2-again");
// Verify that no updates occurred to this region
assertEquals(region.getEntry("key-1").getValue(), "vm2-key-1-again");
assertEquals(region.getEntry("key-2").getValue(), "vm2-key-2-again");
}
});
Wait.pause(500);
vm1.invoke(new CacheSerializableRunnable("Validate Entries") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
// Verify that neither 'key-1' 'key-2' was updated
// and contain the original value
assertEquals(region.getEntry("key-1").getValue(), "vm1-key-1-again");
assertEquals(region.getEntry("key-2").getValue(), "vm1-key-2-again");
}
});
// Unregister interest again (to verify that a client can unregister interest
// in a key that its not interested in with no problem.
vm1.invoke(new CacheSerializableRunnable("Unregister Interest") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
try {
region.unregisterInterest("key-1");
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While registering interest: ", ex);
}
}
});
vm2.invoke(new CacheSerializableRunnable("Unregister Interest") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
try {
region.unregisterInterest("key-2");
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While registering interest: ", ex);
}
}
});
// Close cache server clients
SerializableRunnable close = new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
};
vm1.invoke(close);
vm2.invoke(close);
// Stop cache server
vm0.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
/**
* Tests interest list registration.
*/
@Test
public void test020InterestListRegistration() throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = Host.getHost(0).getVM(2);
// Create cache server
vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
CacheLoader cl = new CacheLoader() {
@Override
public Object load(LoaderHelper helper) {
return helper.getKey();
}
@Override
public void close() {
}
};
AttributesFactory factory = getBridgeServerRegionAttributes(cl, null);
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
});
// Create cache server clients
final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
SerializableRunnable create = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null);
createRegion(name, factory.create());
}
};
vm1.invoke(create);
vm2.invoke(create);
// Get values for key 1 and key 6 so that there are entries in the clients.
// Register interest in a list of keys.
vm1.invoke(new CacheSerializableRunnable("Create Entries and Register Interest") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
assertEquals(region.get("key-1"), "key-1");
assertEquals(region.get("key-6"), "key-6");
try {
List list = new ArrayList();
list.add("key-1");
list.add("key-2");
list.add("key-3");
list.add("key-4");
list.add("key-5");
region.registerInterest(list);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While registering interest: ", ex);
}
}
});
vm2.invoke(new CacheSerializableRunnable("Create Entries and Register Interest") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
assertEquals(region.get("key-1"), "key-1");
assertEquals(region.get("key-6"), "key-6");
}
});
// Put new values and validate updates (VM2)
vm2.invoke(new CacheSerializableRunnable("Put New Values") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
region.put("key-1", "vm2-key-1");
region.put("key-6", "vm2-key-6");
// Verify that no updates occurred to this region
assertEquals(region.getEntry("key-1").getValue(), "vm2-key-1");
assertEquals(region.getEntry("key-6").getValue(), "vm2-key-6");
}
});
Wait.pause(5 * 1000);
vm1.invoke(new CacheSerializableRunnable("Validate Entries") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
// Verify that 'key-1' was updated
assertEquals(region.getEntry("key-1").getValue(), "vm2-key-1");
// Verify that 'key-6' was not invalidated
assertEquals(region.getEntry("key-6").getValue(), "key-6");
}
});
// Close cache server clients
SerializableRunnable close = new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
};
vm1.invoke(close);
vm2.invoke(close);
// Stop cache server
vm0.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
protected class ConnectionPoolDUnitTestSerializable2 implements java.io.Serializable {
protected ConnectionPoolDUnitTestSerializable2(String key) {
_key = key;
}
public String getKey() {
return _key;
}
protected String _key;
}
/**
* Accessed by reflection DO NOT REMOVE
*
*/
protected static int getCacheServerPort() {
return bridgeServerPort;
}
protected static long getNumberOfAfterCreates() {
return numberOfAfterCreates;
}
protected static long getNumberOfAfterUpdates() {
return numberOfAfterUpdates;
}
protected static long getNumberOfAfterInvalidates() {
return numberOfAfterInvalidates;
}
/**
* Creates a "loner" distributed system that has dynamic region creation enabled.
*
* @since GemFire 4.3
*/
protected Cache createDynamicRegionCache(String testName, String connectionPoolName) {
// note that clients use non-persistent dr factories.
DynamicRegionFactory.get()
.open(new DynamicRegionFactory.Config(null, connectionPoolName, false, true));
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("CREATED IT");
Cache z = getCache();
return z;
}
/**
* A handy method to poll for arrival of non-null/non-invalid entries
*
* @param r the Region to poll
* @param key the key of the Entry to poll for
*/
public static void waitForEntry(final Region r, final Object key) {
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
return r.containsValueForKey(key);
}
@Override
public String description() {
return "Waiting for entry " + key + " on region " + r;
}
};
GeodeAwaitility.await().untilAsserted(ev);
}
public static Region waitForSubRegion(final Region r, final String subRegName) {
// final long start = System.currentTimeMillis();
final long MAXWAIT = 10000;
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
return r.getSubregion(subRegName) != null;
}
@Override
public String description() {
return "Waiting for subregion " + subRegName;
}
};
GeodeAwaitility.await().untilAsserted(ev);
Region result = r.getSubregion(subRegName);
return result;
}
public static class CacheServerCacheLoader extends TestCacheLoader implements Declarable {
public CacheServerCacheLoader() {}
@Override
public Object load2(LoaderHelper helper) {
if (helper.getArgument() instanceof Integer) {
try {
Thread.sleep(((Integer) helper.getArgument()).intValue());
} catch (InterruptedException ugh) {
fail("interrupted");
}
}
return helper.getKey();
}
@Override
public void init(Properties props) {}
}
/**
* Create a server that has a value for every key queried and a unique key/value in the specified
* Region that uniquely identifies each instance.
*
* @param vm the VM on which to create the server
* @param rName the name of the Region to create on the server
* @param port the TCP port on which the server should listen
*/
public void createBridgeServer(VM vm, final String rName, final int port,
final boolean notifyBySubscription) {
vm.invoke(new CacheSerializableRunnable("Create Region on Server") {
@Override
public void run2() {
try {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setConcurrencyChecksEnabled(false);
factory.setCacheLoader(new CacheServerCacheLoader());
beginCacheXml();
createRegion(rName, factory.create());
startBridgeServer(port);
finishCacheXml(rName + "-" + port);
Region region = getRootRegion().getSubregion(rName);
assertNotNull(region);
assertNotNull(getRootRegion().getSubregion(rName));
region.put("BridgeServer", new Integer(port)); // A unique key/value to identify the
// BridgeServer
} catch (Exception e) {
getSystem().getLogWriter().severe(e);
fail("Failed to start CacheServer " + e);
}
}
});
}
// test for bug 35884
@Test
public void test021ClientGetOfInvalidServerEntry() throws CacheException {
final String regionName1 = this.getName() + "-1";
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM client = host.getVM(2);
SerializableRunnable createServer = new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.REPLICATE);
factory.setConcurrencyChecksEnabled(false);
createRegion(regionName1, factory.create());
Wait.pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
};
// Create server1.
server1.invoke(createServer);
final int port = server1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server1.getHost());
// Init values at server.
server1.invoke(new CacheSerializableRunnable("Create values") {
@Override
public void run2() throws CacheException {
Region region1 = getRootRegion().getSubregion(regionName1);
// create it invalid
region1.create("key-string-1", null);
}
});
// now try it with a local scope
SerializableRunnable createPool2 = new CacheSerializableRunnable("Create region 2") {
@Override
public void run2() throws CacheException {
// Region region1 = getRootRegion().getSubregion(regionName1);
// region1.localDestroyRegion();
getLonerSystem();
AttributesFactory regionFactory = new AttributesFactory();
regionFactory.setScope(Scope.LOCAL);
regionFactory.setConcurrencyChecksEnabled(false);
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
.info("ZZZZZ host0:" + host0 + " port:" + port);
ClientServerTestCase.configureConnectionPool(regionFactory, host0, port, -1, false, -1, -1,
null);
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
.info("ZZZZZDone host0:" + host0 + " port:" + port);
createRegion(regionName1, regionFactory.create());
}
};
client.invoke(createPool2);
// get the invalid entry on the client.
client.invoke(new CacheSerializableRunnable("get values on client") {
@Override
public void run2() throws CacheException {
Region region1 = getRootRegion().getSubregion(regionName1);
assertEquals(null, region1.getEntry("key-string-1"));
assertEquals(null, region1.get("key-string-1"));
}
});
server1.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
@Test
public void test022ClientRegisterUnregisterRequests() throws CacheException {
final String regionName1 = this.getName() + "-1";
final Host host = Host.getHost(0);
VM server1 = host.getVM(0);
VM client = host.getVM(2);
SerializableRunnable createServer = new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.REPLICATE);
factory.setConcurrencyChecksEnabled(false);
createRegion(regionName1, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
};
// Create server1.
server1.invoke(createServer);
final int port = server1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(server1.getHost());
SerializableRunnable createPool = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
Region region1 = null;
AttributesFactory regionFactory = new AttributesFactory();
regionFactory.setScope(Scope.LOCAL);
regionFactory.setConcurrencyChecksEnabled(false);
ClientServerTestCase.configureConnectionPool(regionFactory, host0, port, -1, true, -1, -1,
null);
region1 = createRegion(regionName1, regionFactory.create());
region1.getAttributesMutator().addCacheListener(new CertifiableTestCacheListener(
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()));
}
};
// Create client.
client.invoke(createPool);
// Init values at server.
server1.invoke(new CacheSerializableRunnable("Create values") {
@Override
public void run2() throws CacheException {
Region region1 = getRootRegion().getSubregion(regionName1);
for (int i = 0; i < 20; i++) {
region1.put("key-string-" + i, "value-" + i);
}
}
});
// Put some values on the client.
client.invoke(new CacheSerializableRunnable("Put values client") {
@Override
public void run2() throws CacheException {
Region region1 = getRootRegion().getSubregion(regionName1);
for (int i = 0; i < 10; i++) {
region1.put("key-string-" + i, "client-value-" + i);
}
}
});
SerializableRunnable closePool = new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
Region region1 = getRootRegion().getSubregion(regionName1);
String pName = region1.getAttributes().getPoolName();
region1.localDestroyRegion();
PoolImpl p = (PoolImpl) PoolManager.find(pName);
p.destroy();
}
};
client.invoke(closePool);
SerializableRunnable validateClientRegisterUnRegister =
new CacheSerializableRunnable("validate Client Register UnRegister") {
@Override
public void run2() throws CacheException {
for (Iterator bi = getCache().getCacheServers().iterator(); bi.hasNext();) {
CacheServerImpl bsi = (CacheServerImpl) bi.next();
final CacheClientNotifierStats ccnStats =
bsi.getAcceptor().getCacheClientNotifier().getStats();
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
return ccnStats.getClientRegisterRequests() == ccnStats
.getClientUnRegisterRequests();
}
@Override
public String description() {
return null;
}
};
GeodeAwaitility.await().untilAsserted(ev);
assertEquals("HealthMonitor Client Register/UnRegister mismatch.",
ccnStats.getClientRegisterRequests(), ccnStats.getClientUnRegisterRequests());
}
}
};
server1.invoke(validateClientRegisterUnRegister);
server1.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
/**
* Tests the containsKeyOnServer operation of the {@link Pool}
*
* @since GemFire 5.0.2
*/
@Test
public void test023ContainsKeyOnServer() throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setConcurrencyChecksEnabled(false);
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
});
final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
SerializableRunnable create = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, false, -1, -1, null);
createRegion(name, factory.create());
}
};
vm1.invoke(create);
vm2.invoke(create);
final Integer key1 = new Integer(0);
final String key2 = "0";
vm2.invoke(new CacheSerializableRunnable("Contains key on server") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
boolean containsKey = false;
containsKey = region.containsKeyOnServer(key1);
assertFalse(containsKey);
containsKey = region.containsKeyOnServer(key2);
assertFalse(containsKey);
}
});
vm1.invoke(new CacheSerializableRunnable("Put values") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
region.put(new Integer(0), new Integer(0));
region.put("0", "0");
}
});
vm2.invoke(new CacheSerializableRunnable("Contains key on server") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
boolean containsKey = false;
containsKey = region.containsKeyOnServer(key1);
assertTrue(containsKey);
containsKey = region.containsKeyOnServer(key2);
assertTrue(containsKey);
}
});
SerializableRunnable close = new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
};
vm1.invoke(close);
vm2.invoke(close);
vm0.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
/**
* Tests that invoking {@link Region#create} with a <code>null</code> value does the right thing
* with the {@link Pool}.
*
* @since GemFire 3.5
*/
@Test
public void test024CreateNullValue() throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final Object createCallbackArg = "CREATE CALLBACK ARG";
vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
});
final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
SerializableRunnable create = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null);
createRegion(name, factory.create());
}
};
vm1.invoke(create);
vm2.invoke(create);
vm2.invoke(new CacheSerializableRunnable("Create nulls") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.create(new Integer(i), null, createCallbackArg);
}
}
});
Wait.pause(1000); // Wait for updates to be propagated
vm2.invoke(new CacheSerializableRunnable("Verify invalidates") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Region.Entry entry = region.getEntry(new Integer(i));
assertNotNull(entry);
assertNull(entry.getValue());
}
}
});
vm1.invoke(new CacheSerializableRunnable("Attempt to create values") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.create(new Integer(i), "new" + i);
}
}
});
Wait.pause(1000); // Wait for updates to be propagated
vm2.invoke(new CacheSerializableRunnable("Verify invalidates") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
Region.Entry entry = region.getEntry(new Integer(i));
assertNotNull(entry);
assertNull(entry.getValue());
}
}
});
SerializableRunnable close = new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
};
vm1.invoke(close);
vm2.invoke(close);
vm0.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
/**
* Tests that a {@link Region#localDestroy} is not propagated to the server and that a
* {@link Region#destroy} is. Also makes sure that callback arguments are passed correctly.
*/
@Test
public void test025Destroy() throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final Object callbackArg = "DESTROY CALLBACK";
vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
CacheWriter cw = new TestCacheWriter() {
@Override
public void beforeCreate2(EntryEvent event) throws CacheWriterException {
}
@Override
public void beforeDestroy2(EntryEvent event) throws CacheWriterException {
Object beca = event.getCallbackArgument();
assertEquals(callbackArg, beca);
}
};
AttributesFactory factory = getBridgeServerRegionAttributes(null, cw);
createRegion(name, factory.create());
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
});
final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
SerializableRunnable create = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null);
Region rgn = createRegion(name, factory.create());
rgn.registerInterestRegex(".*", false, false);
}
};
vm1.invoke(create);
vm1.invoke(new CacheSerializableRunnable("Populate region") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.put(new Integer(i), String.valueOf(i));
}
}
});
vm2.invoke(create);
vm2.invoke(new CacheSerializableRunnable("Load region") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
assertEquals(String.valueOf(i), region.get(new Integer(i)));
}
}
});
vm1.invoke(new CacheSerializableRunnable("Local destroy") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.localDestroy(new Integer(i));
}
}
});
vm2.invoke(new CacheSerializableRunnable("No destroy propagate") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
assertEquals(String.valueOf(i), region.get(new Integer(i)));
}
}
});
vm1.invoke(new CacheSerializableRunnable("Fetch from server") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
assertEquals(String.valueOf(i), region.get(new Integer(i)));
}
}
});
vm0.invoke(new CacheSerializableRunnable("Check no server cache writer") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
TestCacheWriter writer = getTestWriter(region);
writer.wasInvoked();
}
});
vm1.invoke(new CacheSerializableRunnable("Distributed destroy") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.destroy(new Integer(i), callbackArg);
}
}
});
Wait.pause(1000); // Wait for destroys to propagate
vm1.invoke(new CacheSerializableRunnable("Attempt get from server") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
assertNull(region.getEntry(new Integer(i)));
}
}
});
vm2.invoke(new CacheSerializableRunnable("Validate destroy propagate") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
assertNull(region.getEntry(new Integer(i)));
}
}
});
SerializableRunnable close = new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
};
vm1.invoke(close);
vm2.invoke(close);
vm0.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
/**
* Tests that a {@link Region#localDestroyRegion} is not propagated to the server and that a
* {@link Region#destroyRegion} is. Also makes sure that callback arguments are passed correctly.
*/
@Ignore("TODO")
@Test
public void testDestroyRegion() throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final Object callbackArg = "DESTROY CALLBACK";
vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
CacheWriter cw = new TestCacheWriter() {
@Override
public void beforeCreate2(EntryEvent event) throws CacheWriterException {
}
@Override
public void beforeRegionDestroy2(RegionEvent event) throws CacheWriterException {
assertEquals(callbackArg, event.getCallbackArgument());
}
};
AttributesFactory factory = getBridgeServerRegionAttributes(null, cw);
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
});
final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
SerializableRunnable create = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null);
createRegion(name, factory.create());
}
};
vm1.invoke(create);
vm2.invoke(create);
vm1.invoke(new CacheSerializableRunnable("Local destroy region") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
assertNull(getRootRegion().getSubregion(name));
// close the bridge writer to prevent callbacks on the connections
// Not necessary since locally destroying the region takes care of this.
// getPoolClient(region).close();
}
});
vm2.invoke(new CacheSerializableRunnable("No destroy propagate") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
assertNotNull(region);
}
});
vm0.invoke(new CacheSerializableRunnable("Check no server cache writer") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
TestCacheWriter writer = getTestWriter(region);
writer.wasInvoked();
}
});
vm1.invoke(create);
vm1.invoke(new CacheSerializableRunnable("Distributed destroy region") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
assertNotNull(region);
region.destroyRegion(callbackArg);
assertNull(getRootRegion().getSubregion(name));
// close the bridge writer to prevent callbacks on the connections
// Not necessary since locally destroying the region takes care of this.
// getPoolClient(region).close();
}
});
Wait.pause(1000); // Wait for destroys to propagate
vm2.invoke(new CacheSerializableRunnable("Verify destroy propagate") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
assertNull(region);
// todo close the bridge writer
// Not necessary since locally destroying the region takes care of this.
}
});
vm0.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
/**
* Tests interest list registration with callback arg with DataPolicy.EMPTY and InterestPolicy.ALL
*/
@Test
public void test026DPEmptyInterestListRegistrationWithCallbackArg() throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = Host.getHost(0).getVM(2);
// Create cache server
vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
CacheLoader cl = new CacheLoader() {
@Override
public Object load(LoaderHelper helper) {
return helper.getKey();
}
@Override
public void close() {
}
};
AttributesFactory factory = getBridgeServerRegionAttributes(cl, null);
createRegion(name, factory.create());
Wait.pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
});
// Create cache server clients
final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
SerializableRunnable create = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null);
factory.addCacheListener(new ControlListener());
factory.setDataPolicy(DataPolicy.EMPTY);
factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
createRegion(name, factory.create());
}
};
SerializableRunnable createPublisher =
new CacheSerializableRunnable("Create publisher region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1,
null);
factory.addCacheListener(new ControlListener());
factory.setDataPolicy(DataPolicy.EMPTY); // make sure empty works with client publishers
createRegion(name, factory.create());
}
};
vm1.invoke(create);
vm2.invoke(createPublisher);
// VM1 Register interest
vm1.invoke(new CacheSerializableRunnable("Create Entries and Register Interest") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
try {
// This call will cause no value to be put into the region
region.registerInterest("key-1", InterestResultPolicy.NONE);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While registering interest: ", ex);
}
}
});
// VM2 Put entry (this will cause a create event in both VM1 and VM2)
vm2.invoke(new CacheSerializableRunnable("Put Value") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
region.create("key-1", "key-1-create", "key-1-create");
}
});
// VM2 Put entry (this will cause an update event in both VM1 and VM2)
vm2.invoke(new CacheSerializableRunnable("Put Value") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
region.put("key-1", "key-1-update", "key-1-update");
}
});
// VM2 Destroy entry (this will cause a destroy event)
vm2.invoke(new CacheSerializableRunnable("Destroy Entry") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
region.destroy("key-1", "key-1-destroy");
}
});
final SerializableRunnable assertEvents = new CacheSerializableRunnable("Verify events") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
ControlListener listener = (ControlListener) region.getAttributes().getCacheListeners()[0];
int eventCount = 3;
listener.waitWhileNotEnoughEvents(60000, eventCount);
assertEquals(eventCount, listener.events.size());
{
EventWrapper ew = (EventWrapper) listener.events.get(0);
assertEquals(TYPE_CREATE, ew.type);
Object key = "key-1";
assertEquals(key, ew.event.getKey());
assertEquals(null, ew.event.getOldValue());
assertEquals(false, ew.event.isOldValueAvailable()); // failure
assertEquals("key-1-create", ew.event.getNewValue());
assertEquals(Operation.CREATE, ew.event.getOperation());
assertEquals("key-1-create", ew.event.getCallbackArgument());
assertEquals(true, ew.event.isOriginRemote());
ew = (EventWrapper) listener.events.get(1);
assertEquals(TYPE_UPDATE, ew.type);
assertEquals(key, ew.event.getKey());
assertEquals(null, ew.event.getOldValue());
assertEquals(false, ew.event.isOldValueAvailable());
assertEquals("key-1-update", ew.event.getNewValue());
assertEquals(Operation.UPDATE, ew.event.getOperation());
assertEquals("key-1-update", ew.event.getCallbackArgument());
assertEquals(true, ew.event.isOriginRemote());
ew = (EventWrapper) listener.events.get(2);
assertEquals(TYPE_DESTROY, ew.type);
assertEquals("key-1-destroy", ew.arg);
assertEquals(key, ew.event.getKey());
assertEquals(null, ew.event.getOldValue());
assertEquals(false, ew.event.isOldValueAvailable());
assertEquals(null, ew.event.getNewValue());
assertEquals(Operation.DESTROY, ew.event.getOperation());
assertEquals("key-1-destroy", ew.event.getCallbackArgument());
assertEquals(true, ew.event.isOriginRemote());
}
}
};
vm1.invoke(assertEvents);
// Close cache server clients
SerializableRunnable close = new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
};
vm1.invoke(close);
vm2.invoke(close);
// Stop cache server
vm0.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
/**
* Tests interest list registration with callback arg with DataPolicy.EMPTY and
* InterestPolicy.CACHE_CONTENT
*/
@Test
public void test027DPEmptyCCInterestListRegistrationWithCallbackArg() throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = Host.getHost(0).getVM(2);
// Create cache server
vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
CacheLoader cl = new CacheLoader() {
@Override
public Object load(LoaderHelper helper) {
return helper.getKey();
}
@Override
public void close() {
}
};
AttributesFactory factory = getBridgeServerRegionAttributes(cl, null);
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
});
// Create cache server clients
final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
SerializableRunnable create = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null);
factory.setCacheListener(new ControlListener());
factory.setDataPolicy(DataPolicy.EMPTY);
factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.CACHE_CONTENT));
createRegion(name, factory.create());
}
};
SerializableRunnable createPublisher =
new CacheSerializableRunnable("Create publisher region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1,
null);
factory.setCacheListener(new ControlListener());
factory.setDataPolicy(DataPolicy.EMPTY); // make sure empty works with client publishers
createRegion(name, factory.create());
}
};
vm1.invoke(create);
vm2.invoke(createPublisher);
// VM1 Register interest
vm1.invoke(new CacheSerializableRunnable("Create Entries and Register Interest") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
try {
// This call will cause no value to be put into the region
region.registerInterest("key-1", InterestResultPolicy.NONE);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While registering interest: ", ex);
}
}
});
// VM2 Put entry (this will cause a create event in both VM1 and VM2)
vm2.invoke(new CacheSerializableRunnable("Put Value") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
region.create("key-1", "key-1-create", "key-1-create");
}
});
// VM2 Put entry (this will cause an update event in both VM1 and VM2)
vm2.invoke(new CacheSerializableRunnable("Put Value") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
region.put("key-1", "key-1-update", "key-1-update");
}
});
// VM2 Destroy entry (this will cause a destroy event)
vm2.invoke(new CacheSerializableRunnable("Destroy Entry") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
region.destroy("key-1", "key-1-destroy");
}
});
final SerializableRunnable assertEvents = new CacheSerializableRunnable("Verify events") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
ControlListener listener = (ControlListener) region.getAttributes().getCacheListeners()[0];
Wait.pause(1000); // we should not get any events but give some time for the server to send
// them
assertEquals(0, listener.events.size());
}
};
vm1.invoke(assertEvents);
// Close cache server clients
SerializableRunnable close = new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
};
vm1.invoke(close);
vm2.invoke(close);
// Stop cache server
vm0.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
/**
* Test dynamic region creation instantiated from a bridge client causing regions to be created on
* two different cache servers.
*
* Also tests the reverse situation, a dynamic region is created on the cache server expecting
* the same region to be created on the client.
*
* Note: This test re-creates Distributed Systems for its own purposes and uses a Loner
* distributed systems to isolate the Bridge Client.
*
*/
@Test
public void test028DynamicRegionCreation() throws Exception {
final String name = this.getName();
final Host host = Host.getHost(0);
final VM client1 = host.getVM(0);
// VM client2 = host.getVM(1);
final VM srv1 = host.getVM(2);
final VM srv2 = host.getVM(3);
final String k1 = name + "-key1";
final String v1 = name + "-val1";
final String k2 = name + "-key2";
final String v2 = name + "-val2";
final String k3 = name + "-key3";
final String v3 = name + "-val3";
client1.invoke(() -> disconnectFromDS());
srv1.invoke(() -> disconnectFromDS());
srv2.invoke(() -> disconnectFromDS());
try {
// setup servers
CacheSerializableRunnable ccs = new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
createDynamicRegionCache(name, (String) null); // Creates a new DS and Cache
assertTrue(DynamicRegionFactory.get().isOpen());
try {
startBridgeServer(0);
} catch (IOException ugh) {
fail("cache server startup failed");
}
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.REPLICATE);
factory.setConcurrencyChecksEnabled(false);
Region region = createRootRegion(name, factory.create());
region.put(k1, v1);
Assert.assertTrue(region.get(k1).equals(v1));
}
};
srv1.invoke(ccs);
srv2.invoke(ccs);
final String srv1Host = NetworkUtils.getServerHostName(srv1.getHost());
final int srv1Port = srv1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final int srv2Port = srv2.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
// final String srv2Host = getServerHostName(srv2.getHost());
// setup clients, do basic tests to make sure pool with notifier work as advertised
client1.invoke(new CacheSerializableRunnable("Create Cache Client") {
@Override
public void run2() throws CacheException {
createLonerDS();
AttributesFactory factory = new AttributesFactory();
factory.setConcurrencyChecksEnabled(false);
Pool cp = ClientServerTestCase.configureConnectionPool(factory, srv1Host, srv1Port,
srv2Port, true, -1, -1, null);
{
final PoolImpl pool = (PoolImpl) cp;
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
if (pool.getPrimary() == null) {
return false;
}
if (pool.getRedundants().size() < 1) {
return false;
}
return true;
}
@Override
public String description() {
return null;
}
};
GeodeAwaitility.await().untilAsserted(ev);
assertNotNull(pool.getPrimary());
assertTrue("backups=" + pool.getRedundants() + " expected=" + 1,
pool.getRedundants().size() >= 1);
}
createDynamicRegionCache(name, "testPool");
assertTrue(DynamicRegionFactory.get().isOpen());
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
factory.setCacheListener(new CertifiableTestCacheListener(
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()));
Region region = createRootRegion(name, factory.create());
assertNull(region.getEntry(k1));
region.registerInterestRegex(".*", InterestResultPolicy.KEYS_VALUES); // this should match
// the key
assertEquals(v1, region.getEntry(k1).getValue()); // Update via registered interest
assertNull(region.getEntry(k2));
region.put(k2, v2); // use the Pool
assertEquals(v2, region.getEntry(k2).getValue()); // Ensure that the notifier didn't un-do
// the put, bug 35355
region.put(k3, v3); // setup a key for invalidation from a notifier
}
});
srv1.invoke(new CacheSerializableRunnable("Validate Server1 update") {
@Override
public void run2() throws CacheException {
CacheClientNotifier ccn = getInstance();
final CacheClientNotifierStats ccnStats = ccn.getStats();
final int eventCount = ccnStats.getEvents();
Region r = getRootRegion(name);
assertNotNull(r);
assertEquals(v2, r.getEntry(k2).getValue()); // Validate the Pool worked, getEntry works
// because of the mirror
assertEquals(v3, r.getEntry(k3).getValue()); // Make sure we have the other entry to use
// for notification
r.put(k3, v1); // Change k3, sending some data to the client notifier
// Wait for the update to propagate to the clients
final int maxTime = 20000;
// long start = System.currentTimeMillis();
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
return ccnStats.getEvents() > eventCount;
}
@Override
public String description() {
return "waiting for ccnStat";
}
};
GeodeAwaitility.await().untilAsserted(ev);
// Set prox = ccn.getClientProxies();
// assertIndexDetailsEquals(1, prox.size());
// for (Iterator cpi = prox.iterator(); cpi.hasNext(); ) {
// CacheClientProxy ccp = (CacheClientProxy) cpi.next();
// start = System.currentTimeMillis();
// while (ccp.getMessagesProcessed() < 1) {
// assertTrue("Waited more than " + maxTime + "ms for client notification",
// (System.currentTimeMillis() - start) < maxTime);
// try {
// Thread.sleep(100);
// } catch (InterruptedException ine) { fail("Interrupted while waiting for client
// notifier to complete"); }
// }
// }
}
});
srv2.invoke(new CacheSerializableRunnable("Validate Server2 update") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion(name);
assertNotNull(r);
assertEquals(v2, r.getEntry(k2).getValue()); // Validate the Pool worked, getEntry works
// because of the mirror
assertEquals(v1, r.getEntry(k3).getValue()); // From peer update
}
});
client1.invoke(new CacheSerializableRunnable("Validate Client notification") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion(name);
assertNotNull(r);
CertifiableTestCacheListener ctl =
(CertifiableTestCacheListener) r.getAttributes().getCacheListener();
ctl.waitForUpdated(k3);
assertEquals(v1, r.getEntry(k3).getValue()); // Ensure that the notifier updated the entry
}
});
// Ok, now we are ready to do some dynamic region action!
final String v1Dynamic = v1 + "dynamic";
final String dynFromClientName = name + "-dynamic-client";
final String dynFromServerName = name + "-dynamic-server";
client1.invoke(new CacheSerializableRunnable("Client dynamic region creation") {
@Override
public void run2() throws CacheException {
assertTrue(DynamicRegionFactory.get().isOpen());
Region r = getRootRegion(name);
assertNotNull(r);
Region dr = DynamicRegionFactory.get().createDynamicRegion(name, dynFromClientName);
assertNull(dr.get(k1)); // This should be enough to validate the creation on the server
dr.put(k1, v1Dynamic);
assertEquals(v1Dynamic, dr.getEntry(k1).getValue());
}
});
// Assert the servers have the dynamic region and the new value
CacheSerializableRunnable valDR =
new CacheSerializableRunnable("Validate dynamic region creation on server") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion(name);
assertNotNull(r);
long end = System.currentTimeMillis() + 10000;
Region dr = null;
for (;;) {
try {
dr = r.getSubregion(dynFromClientName);
assertNotNull(dr);
assertNotNull(getCache().getRegion(name + Region.SEPARATOR + dynFromClientName));
break;
} catch (AssertionError e) {
if (System.currentTimeMillis() > end) {
throw e;
}
}
}
assertEquals(v1Dynamic, dr.getEntry(k1).getValue());
}
};
srv1.invoke(valDR);
srv2.invoke(valDR);
// now delete the dynamic region and see if it goes away on servers
client1.invoke(new CacheSerializableRunnable("Client dynamic region destruction") {
@Override
public void run2() throws CacheException {
assertTrue(DynamicRegionFactory.get().isActive());
Region r = getRootRegion(name);
assertNotNull(r);
String drName = r.getFullPath() + Region.SEPARATOR + dynFromClientName;
assertNotNull(getCache().getRegion(drName));
DynamicRegionFactory.get().destroyDynamicRegion(drName);
assertNull(getCache().getRegion(drName));
}
});
// Assert the servers no longer have the dynamic region
CacheSerializableRunnable valNoDR =
new CacheSerializableRunnable("Validate dynamic region destruction on server") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion(name);
assertNotNull(r);
String drName = r.getFullPath() + Region.SEPARATOR + dynFromClientName;
assertNull(getCache().getRegion(drName));
try {
DynamicRegionFactory.get().destroyDynamicRegion(drName);
fail("expected RegionDestroyedException");
} catch (RegionDestroyedException expected) {
}
}
};
srv1.invoke(valNoDR);
srv2.invoke(valNoDR);
// Now try the reverse, create a dynamic region on the server and see if the client
// has it
srv2.invoke(new CacheSerializableRunnable("Server dynamic region creation") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion(name);
assertNotNull(r);
Region dr = DynamicRegionFactory.get().createDynamicRegion(name, dynFromServerName);
assertNull(dr.get(k1));
dr.put(k1, v1Dynamic);
assertEquals(v1Dynamic, dr.getEntry(k1).getValue());
}
});
// Assert the servers have the dynamic region and the new value
srv1.invoke(new CacheSerializableRunnable(
"Validate dynamic region creation propagation to other server") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion(name);
assertNotNull(r);
Region dr = waitForSubRegion(r, dynFromServerName);
assertNotNull(dr);
assertNotNull(getCache().getRegion(name + Region.SEPARATOR + dynFromServerName));
waitForEntry(dr, k1);
assertNotNull(dr.getEntry(k1));
assertEquals(v1Dynamic, dr.getEntry(k1).getValue());
}
});
// Assert the clients have the dynamic region and the new value
client1.invoke(new CacheSerializableRunnable("Validate dynamic region creation on client") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion(name);
assertNotNull(r);
long end = System.currentTimeMillis() + 10000;
Region dr = null;
for (;;) {
try {
dr = r.getSubregion(dynFromServerName);
assertNotNull(dr);
assertNotNull(getCache().getRegion(name + Region.SEPARATOR + dynFromServerName));
break;
} catch (AssertionError e) {
if (System.currentTimeMillis() > end) {
throw e;
} else {
Wait.pause(1000);
}
}
}
waitForEntry(dr, k1);
assertNotNull(dr.getEntry(k1));
assertEquals(v1Dynamic, dr.getEntry(k1).getValue());
}
});
// now delete the dynamic region on a server and see if it goes away on client
srv2.invoke(new CacheSerializableRunnable("Server dynamic region destruction") {
@Override
public void run2() throws CacheException {
assertTrue(DynamicRegionFactory.get().isActive());
Region r = getRootRegion(name);
assertNotNull(r);
String drName = r.getFullPath() + Region.SEPARATOR + dynFromServerName;
assertNotNull(getCache().getRegion(drName));
DynamicRegionFactory.get().destroyDynamicRegion(drName);
assertNull(getCache().getRegion(drName));
}
});
srv1.invoke(
new CacheSerializableRunnable("Validate dynamic region destruction on other server") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion(name);
assertNotNull(r);
String drName = r.getFullPath() + Region.SEPARATOR + dynFromServerName;
{
int retry = 100;
while (retry-- > 0 && getCache().getRegion(drName) != null) {
try {
Thread.sleep(100);
} catch (InterruptedException ignore) {
fail("interrupted");
}
}
}
assertNull(getCache().getRegion(drName));
}
});
// Assert the clients no longer have the dynamic region
client1
.invoke(new CacheSerializableRunnable("Validate dynamic region destruction on client") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion(name);
assertNotNull(r);
String drName = r.getFullPath() + Region.SEPARATOR + dynFromServerName;
{
int retry = 100;
while (retry-- > 0 && getCache().getRegion(drName) != null) {
try {
Thread.sleep(100);
} catch (InterruptedException ignore) {
fail("interrupted");
}
}
}
assertNull(getCache().getRegion(drName));
// sleep to make sure that the dynamic region entry from the internal
// region,dynamicRegionList in DynamicRegionFactory // ?
try {
Thread.sleep(10000);
} catch (InterruptedException ignore) {
fail("interrupted");
}
try {
DynamicRegionFactory.get().destroyDynamicRegion(drName);
fail("expected RegionDestroyedException");
} catch (RegionDestroyedException expected) {
}
}
});
} finally {
client1.invoke(() -> disconnectFromDS()); // clean-up loner
srv1.invoke(() -> disconnectFromDS());
srv2.invoke(() -> disconnectFromDS());
}
}
/**
* Test for bug 36279
*/
@Test
public void test029EmptyByteArray() throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
final Object createCallbackArg = "CREATE CALLBACK ARG";
vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
});
final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
SerializableRunnable create = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null);
createRegion(name, factory.create());
}
};
vm1.invoke(create);
vm1.invoke(new CacheSerializableRunnable("Create empty byte array") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 1; i++) {
region.create(new Integer(i), new byte[0], createCallbackArg);
}
}
});
vm1.invoke(new CacheSerializableRunnable("Verify values on client") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 1; i++) {
Region.Entry entry = region.getEntry(new Integer(i));
assertNotNull(entry);
byte[] value = (byte[]) entry.getValue();
assertNotNull(value);
assertEquals(0, value.length);
}
}
});
vm0.invoke(new CacheSerializableRunnable("Verify values on server") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 1; i++) {
Region.Entry entry = region.getEntry(new Integer(i));
assertNotNull(entry);
byte[] value = (byte[]) entry.getValue();
assertNotNull(value);
assertEquals(0, value.length);
}
}
});
SerializableRunnable close = new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
};
vm1.invoke(close);
vm0.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
/**
* Tests interest list registration with callback arg
*/
@Test
public void test030InterestListRegistrationWithCallbackArg() throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = Host.getHost(0).getVM(2);
// Create cache server
vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
CacheLoader cl = new CacheLoader() {
@Override
public Object load(LoaderHelper helper) {
return helper.getKey();
}
@Override
public void close() {
}
};
AttributesFactory factory = getBridgeServerRegionAttributes(cl, null);
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
});
// Create cache server clients
final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
SerializableRunnable create = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null);
factory.setCacheListener(new ControlListener());
createRegion(name, factory.create());
}
};
vm1.invoke(create);
vm2.invoke(create);
// VM1 Register interest
vm1.invoke(new CacheSerializableRunnable("Create Entries and Register Interest") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
try {
// This call will cause no value to be put into the region
region.registerInterest("key-1", InterestResultPolicy.NONE);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While registering interest: ", ex);
}
}
});
// VM2 Put entry (this will cause a create event in both VM1 and VM2)
vm2.invoke(new CacheSerializableRunnable("Put Value") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
region.create("key-1", "key-1-create", "key-1-create");
}
});
// VM2 Put entry (this will cause an update event in both VM1 and VM2)
vm2.invoke(new CacheSerializableRunnable("Put Value") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
region.put("key-1", "key-1-update", "key-1-update");
}
});
// VM2 Destroy entry (this will cause a destroy event)
vm2.invoke(new CacheSerializableRunnable("Destroy Entry") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
region.destroy("key-1", "key-1-destroy");
}
});
final SerializableRunnable assertEvents = new CacheSerializableRunnable("Verify events") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
ControlListener listener = (ControlListener) region.getAttributes().getCacheListeners()[0];
int eventCount = 3;
listener.waitWhileNotEnoughEvents(60000, eventCount);
assertEquals(eventCount, listener.events.size());
{
EventWrapper ew = (EventWrapper) listener.events.get(0);
assertEquals(ew.type, TYPE_CREATE);
Object key = "key-1";
assertEquals(key, ew.event.getKey());
assertEquals(null, ew.event.getOldValue());
assertEquals("key-1-create", ew.event.getNewValue());
assertEquals(Operation.CREATE, ew.event.getOperation());
assertEquals("key-1-create", ew.event.getCallbackArgument());
assertEquals(true, ew.event.isOriginRemote());
ew = (EventWrapper) listener.events.get(1);
assertEquals(ew.type, TYPE_UPDATE);
assertEquals(key, ew.event.getKey());
assertEquals("key-1-create", ew.event.getOldValue());
assertEquals("key-1-update", ew.event.getNewValue());
assertEquals(Operation.UPDATE, ew.event.getOperation());
assertEquals("key-1-update", ew.event.getCallbackArgument());
assertEquals(true, ew.event.isOriginRemote());
ew = (EventWrapper) listener.events.get(2);
assertEquals(ew.type, TYPE_DESTROY);
assertEquals("key-1-destroy", ew.arg);
assertEquals(key, ew.event.getKey());
assertEquals("key-1-update", ew.event.getOldValue());
assertEquals(null, ew.event.getNewValue());
assertEquals(Operation.DESTROY, ew.event.getOperation());
assertEquals("key-1-destroy", ew.event.getCallbackArgument());
assertEquals(true, ew.event.isOriginRemote());
}
}
};
vm1.invoke(assertEvents);
// Close cache server clients
SerializableRunnable close = new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
};
vm1.invoke(close);
vm2.invoke(close);
// Stop cache server
vm0.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
/**
* Tests the keySetOnServer operation of the {@link Pool}
*
* @since GemFire 5.0.2
*/
@Test
public void test031KeySetOnServer() throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setConcurrencyChecksEnabled(false);
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
});
final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
SerializableRunnable create = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null);
createRegion(name, factory.create());
}
};
vm1.invoke(create);
vm2.invoke(create);
vm2.invoke(new CacheSerializableRunnable("Get keys on server") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
Set keySet = region.keySetOnServer();
assertNotNull(keySet);
assertEquals(0, keySet.size());
}
});
vm1.invoke(new CacheSerializableRunnable("Put values") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.put(new Integer(i), new Integer(i));
}
}
});
vm2.invoke(new CacheSerializableRunnable("Get keys on server") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
Set keySet = region.keySetOnServer();
assertNotNull(keySet);
assertEquals(10, keySet.size());
}
});
SerializableRunnable close = new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
};
vm1.invoke(close);
vm2.invoke(close);
vm0.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
// this test doesn't do anything so I commented it out
// /**
// * Tests that new connections update client notification connections.
// */
// public void test032NewConnections() throws Exception {
// final String name = this.getName();
// final Host host = Host.getHost(0);
// VM vm0 = host.getVM(0);
// VM vm1 = host.getVM(1);
// VM vm2 = host.getVM(2);
// // Cache server serves up the region
// vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
// public void run2() throws CacheException {
// AttributesFactory factory = getBridgeServerRegionAttributes(null,null);
// Region region = createRegion(name, factory.create());
// pause(1000);
// try {
// startBridgeServer(0);
// } catch (Exception ex) {
// fail("While starting CacheServer", ex);
// }
// }
// });
// final int port =
// vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
// final String host0 = getServerHostName(vm0.getHost());
// SerializableRunnable create =
// new CacheSerializableRunnable("Create region") {
// public void run2() throws CacheException {
// getCache();
// AttributesFactory factory = new AttributesFactory();
// factory.setScope(Scope.LOCAL);
// ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
// createRegion(name, factory.create());
// }
// };
// vm1.invoke(create);
// vm2.invoke(create);
// vm1.invoke(new CacheSerializableRunnable("Create new connection") {
// public void run2() throws CacheException {
// Region region = getRootRegion().getSubregion(name);
// BridgeClient writer = getPoolClient(region);
// Endpoint[] endpoints = (Endpoint[])writer.getEndpoints();
// for (int i=0; i<endpoints.length; i++) endpoints[i].addNewConnection();
// }
// });
// SerializableRunnable close =
// new CacheSerializableRunnable("Close Pool") {
// public void run2() throws CacheException {
// Region region = getRootRegion().getSubregion(name);
// region.localDestroyRegion();
// }
// };
// vm1.invoke(close);
// vm2.invoke(close);
// vm0.invoke(new SerializableRunnable("Stop CacheServer") {
// public void run() {
// stopBridgeServer(getCache());
// }
// });
// }
/**
* Tests that creating, putting and getting a non-serializable key or value throws the correct
* (NotSerializableException) exception.
*/
@Test
public void test033NotSerializableException() throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
// VM vm2 = host.getVM(2);
vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
});
final int port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
SerializableRunnable create = new CacheSerializableRunnable("Create region") {
@Override
public void run2() throws CacheException {
getLonerSystem();
getCache();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
ClientServerTestCase.configureConnectionPool(factory, host0, port, -1, true, -1, -1, null);
createRegion(name, factory.create());
}
};
vm1.invoke(create);
vm1.invoke(new CacheSerializableRunnable("Attempt to create a non-serializable value") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
try {
region.create(new Integer(1), new ConnectionPoolTestNonSerializable());
fail("Should not have been able to create a ConnectionPoolTestNonSerializable");
} catch (Exception e) {
if (!(e.getCause() instanceof java.io.NotSerializableException))
fail("Unexpected exception while creating a non-serializable value " + e);
}
}
});
vm1.invoke(new CacheSerializableRunnable("Attempt to put a non-serializable value") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
try {
region.put(new Integer(1), new ConnectionPoolTestNonSerializable());
fail("Should not have been able to put a ConnectionPoolTestNonSerializable");
} catch (Exception e) {
if (!(e.getCause() instanceof java.io.NotSerializableException))
fail("Unexpected exception while putting a non-serializable value " + e);
}
}
});
vm1.invoke(new CacheSerializableRunnable("Attempt to get a non-serializable key") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
try {
region.get(new ConnectionPoolTestNonSerializable());
fail("Should not have been able to get a ConnectionPoolTestNonSerializable");
} catch (Exception e) {
if (!(e.getCause() instanceof java.io.NotSerializableException))
fail("Unexpected exception while getting a non-serializable key " + e);
}
}
});
SerializableRunnable close = new CacheSerializableRunnable("Close Pool") {
@Override
public void run2() throws CacheException {
Region region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
};
vm1.invoke(close);
vm0.invoke(new SerializableRunnable("Stop CacheServer") {
@Override
public void run() {
stopBridgeServer(getCache());
}
});
}
protected class ConnectionPoolTestNonSerializable {
protected ConnectionPoolTestNonSerializable() {}
}
/**
* Tests 'notify-all' client updates. This test verifies that: - only invalidates are sent as part
* of the 'notify-all' mode of client updates - originators of updates are not sent invalidates -
* non-originators of updates are sent invalidates - multiple invalidates are not sent for the
* same update
*/
@Test
public void test034NotifyAllUpdates() throws CacheException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
disconnectAllFromDS();
// Create the cache servers with distributed, mirrored region
SerializableRunnable createServer = new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
CacheLoader cl = new CacheLoader() {
@Override
public Object load(LoaderHelper helper) {
return helper.getKey();
}
@Override
public void close() {
}
};
AttributesFactory factory = getBridgeServerMirroredAckRegionAttributes(cl, null);
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
};
getSystem().getLogWriter().info("before create server");
vm0.invoke(createServer);
vm1.invoke(createServer);
// Create cache server clients
final int numberOfKeys = 10;
final String host0 = NetworkUtils.getServerHostName(host);
final int vm0Port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
final int vm1Port = vm1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
SerializableRunnable createClient =
new CacheSerializableRunnable("Create Cache Server Client") {
@Override
public void run2() throws CacheException {
// reset all static listener variables in case this is being rerun in a subclass
numberOfAfterInvalidates = 0;
numberOfAfterCreates = 0;
numberOfAfterUpdates = 0;
getLonerSystem();
// create the region
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
ClientServerTestCase.configureConnectionPool(factory, host0, vm0Port, vm1Port, true, -1,
-1, null);
Region rgn = createRegion(name, factory.create());
}
};
getSystem().getLogWriter().info("before create client");
vm2.invoke(createClient);
vm3.invoke(createClient);
// Initialize each client with entries (so that afterInvalidate is called)
SerializableRunnable initializeClient = new CacheSerializableRunnable("Initialize Client") {
@Override
public void run2() throws CacheException {
numberOfAfterInvalidates = 0;
numberOfAfterCreates = 0;
numberOfAfterUpdates = 0;
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
for (int i = 0; i < numberOfKeys; i++) {
assertEquals("key-" + i, region.get("key-" + i));
}
}
};
getSystem().getLogWriter().info("before initialize client");
vm2.invoke(initializeClient);
vm3.invoke(initializeClient);
// Add a CacheListener to both vm2 and vm3
vm2.invoke(new CacheSerializableRunnable("Add CacheListener 1") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
CacheListener listener = new CacheListenerAdapter() {
@Override
public void afterCreate(EntryEvent e) {
numberOfAfterCreates++;
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
.info("vm2 numberOfAfterCreates: " + numberOfAfterCreates);
}
@Override
public void afterUpdate(EntryEvent e) {
numberOfAfterUpdates++;
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
.info("vm2 numberOfAfterUpdates: " + numberOfAfterUpdates);
}
@Override
public void afterInvalidate(EntryEvent e) {
numberOfAfterInvalidates++;
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
.info("vm2 numberOfAfterInvalidates: " + numberOfAfterInvalidates);
}
};
region.getAttributesMutator().addCacheListener(listener);
region.registerInterestRegex(".*", false, false);
}
});
vm3.invoke(new CacheSerializableRunnable("Add CacheListener 2") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
CacheListener listener = new CacheListenerAdapter() {
@Override
public void afterCreate(EntryEvent e) {
numberOfAfterCreates++;
// getLogWriter().info("vm3 numberOfAfterCreates: " + numberOfAfterCreates);
}
@Override
public void afterUpdate(EntryEvent e) {
numberOfAfterUpdates++;
// getLogWriter().info("vm3 numberOfAfterUpdates: " + numberOfAfterUpdates);
}
@Override
public void afterInvalidate(EntryEvent e) {
numberOfAfterInvalidates++;
// getLogWriter().info("vm3 numberOfAfterInvalidates: " + numberOfAfterInvalidates);
}
};
region.getAttributesMutator().addCacheListener(listener);
region.registerInterestRegex(".*", false, false);
}
});
Wait.pause(3000);
getSystem().getLogWriter().info("before puts");
// Use vm2 to put new values
// This should cause 10 afterUpdates to vm2 and 10 afterInvalidates to vm3
vm2.invoke(new CacheSerializableRunnable("Put New Values") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
for (int i = 0; i < 10; i++) {
region.put("key-" + i, "key-" + i);
}
}
});
getSystem().getLogWriter().info("after puts");
// Wait to make sure all the updates are received
Wait.pause(1000);
long vm2AfterCreates = vm2.invoke(() -> ConnectionPoolDUnitTest.getNumberOfAfterCreates());
long vm2AfterUpdates = vm2.invoke(() -> ConnectionPoolDUnitTest.getNumberOfAfterUpdates());
long vm2AfterInvalidates =
vm2.invoke(() -> ConnectionPoolDUnitTest.getNumberOfAfterInvalidates());
long vm3AfterCreates = vm3.invoke(() -> ConnectionPoolDUnitTest.getNumberOfAfterCreates());
long vm3AfterUpdates = vm3.invoke(() -> ConnectionPoolDUnitTest.getNumberOfAfterUpdates());
long vm3AfterInvalidates =
vm3.invoke(() -> ConnectionPoolDUnitTest.getNumberOfAfterInvalidates());
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
.info("vm2AfterCreates: " + vm2AfterCreates);
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
.info("vm2AfterUpdates: " + vm2AfterUpdates);
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
.info("vm2AfterInvalidates: " + vm2AfterInvalidates);
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
.info("vm3AfterCreates: " + vm3AfterCreates);
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
.info("vm3AfterUpdates: " + vm3AfterUpdates);
org.apache.geode.test.dunit.LogWriterUtils.getLogWriter()
.info("vm3AfterInvalidates: " + vm3AfterInvalidates);
assertTrue("VM2 should not have received any afterCreate messages", vm2AfterCreates == 0);
assertTrue("VM2 should not have received any afterInvalidate messages",
vm2AfterInvalidates == 0);
assertTrue("VM2 received " + vm2AfterUpdates + " afterUpdate messages. It should have received "
+ numberOfKeys, vm2AfterUpdates == numberOfKeys);
assertTrue("VM3 should not have received any afterCreate messages", vm3AfterCreates == 0);
assertTrue("VM3 should not have received any afterUpdate messages", vm3AfterUpdates == 0);
assertTrue(
"VM3 received " + vm3AfterInvalidates
+ " afterInvalidate messages. It should have received " + numberOfKeys,
vm3AfterInvalidates == numberOfKeys);
}
/**
* Test that the "notify by subscription" attribute is unique for each BridgeServer and Gateway
*
*/
/*
* public void test035NotifyBySubscriptionIsolation() throws Exception { final String name =
* this.getName(); final Host host = Host.getHost(0); final VM server = host.getVM(3); final VM
* client1 = host.getVM(1); final VM client2 = host.getVM(2);
*
* final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(3); final int bs1Port =
* ports[0]; final int bs2Port = ports[1]; final int gwPort = ports[2];
*
* final String key1 = "key1-" + name; final String val1 = "val1-" + name; final String key2 =
* "key2-" + name; final String val2 = "val2-" + name;
*
* try { server.invoke(new CacheSerializableRunnable("Setup BridgeServers and Gateway") { public
* void run2() throws CacheException { Cache cache = getCache();
*
* try {
*
* // Create a gateway (which sets notify-by-subscription to true) cache.setGatewayHub(name,
* gwPort).start();
*
* // Start the server that does not have notify-by-subscription (server2) CacheServer bridge2 =
* cache.addCacheServer(); bridge2.setPort(bs2Port); bridge2.setNotifyBySubscription(false);
* String[] noNotifyGroup = {"noNotifyGroup"}; bridge2.setGroups(noNotifyGroup); bridge2.start();
* assertFalse(bridge2.getNotifyBySubscription()); { BridgeServerImpl bsi = (BridgeServerImpl)
* bridge2; AcceptorImpl aci = bsi.getAcceptor();
*
* //assertFalse(aci.getCacheClientNotifier().getNotifyBySubscription()); }
*
* // Start the server that DOES have notify-by-subscription (server1) CacheServer bridge1 =
* cache.addCacheServer(); bridge1.setPort(bs1Port); bridge1.setNotifyBySubscription(true);
* String[] notifyGroup = {"notifyGroup"}; bridge1.setGroups(notifyGroup); bridge1.start();
* assertTrue(bridge1.getNotifyBySubscription()); { BridgeServerImpl bsi = (BridgeServerImpl)
* bridge1; AcceptorImpl aci = bsi.getAcceptor();
* assertTrue(aci.getCacheClientNotifier().getNotifyBySubscription()); }
*
* } catch (IOException ioe) { fail("Setup of BridgeServer test " + name + " failed", ioe ); }
*
* Region r = createRootRegion(name, getRegionAttributes()); r.put(key1, val1); } });
*
* client1.invoke(new
* CacheSerializableRunnable("Test client1 to server with true notify-by-subscription") { public
* void run2() throws CacheException { createLonerDS(); AttributesFactory factory = new
* AttributesFactory(); factory.setScope(Scope.LOCAL);
* ClientServerTestCase.configureConnectionPool(factory,getServerHostName(host),bs1Port,-1,true,-1
* ,-1, "notifyGroup"); factory.setCacheListener(new
* CertifiableTestCacheListener(getLogWriter())); Region r = createRootRegion(name,
* factory.create()); assertNull(r.getEntry(key1)); r.registerInterest(key1);
* assertNotNull(r.getEntry(key1)); assertIndexDetailsEquals(val1, r.getEntry(key1).getValue());
* r.registerInterest(key2); assertNull(r.getEntry(key2)); } });
*
* client2.invoke(new
* CacheSerializableRunnable("Test client2 to server with false notify-by-subscription") { public
* void run2() throws CacheException { createLonerDS(); AttributesFactory factory = new
* AttributesFactory();
* ClientServerTestCase.configureConnectionPool(factory,getServerHostName(host),bs2Port,-1,true,-1
* ,-1, "noNotifyGroup");
*
* factory.setScope(Scope.LOCAL); factory.setCacheListener(new
* CertifiableTestCacheListener(getLogWriter())); Region r = createRootRegion(name,
* factory.create()); assertNull(r.getEntry(key1)); assertIndexDetailsEquals(val1, r.get(key1));
* assertNull(r.getEntry(key2)); r.registerInterest(key2); assertNull(r.getEntry(key2)); } });
*
* server.invoke(new
* CacheSerializableRunnable("Update server with new values for client notification") { public
* void run2() throws CacheException { Region r = getRootRegion(name); assertNotNull(r);
* r.put(key2, val2); // Create a new entry r.put(key1, val2); // Change the first entry } });
*
* client1.invoke(new
* CacheSerializableRunnable("Test update from to server with true notify-by-subscription") {
* public void run2() throws CacheException { Region r = getRootRegion(name); assertNotNull(r);
* CertifiableTestCacheListener ctl = (CertifiableTestCacheListener)
* r.getAttributes().getCacheListener();
*
* ctl.waitForUpdated(key1); assertNotNull(r.getEntry(key1)); assertIndexDetailsEquals(val2,
* r.getEntry(key1).getValue()); // new value should have been pushed
*
* ctl.waitForCreated(key2); assertNotNull(r.getEntry(key2)); // new entry should have been pushed
* assertIndexDetailsEquals(val2, r.getEntry(key2).getValue()); } });
*
* client2.invoke(new
* CacheSerializableRunnable("Test update from server with false notify-by-subscription") { public
* void run2() throws CacheException { Region r = getRootRegion(name); assertNotNull(r);
* CertifiableTestCacheListener ctl = (CertifiableTestCacheListener)
* r.getAttributes().getCacheListener(); ctl.waitForInvalidated(key1);
* assertNotNull(r.getEntry(key1)); assertNull(r.getEntry(key1).getValue()); // Invalidate should
* have been pushed assertIndexDetailsEquals(val2, r.get(key1)); // New value should be fetched
*
* assertNull(r.getEntry(key2)); // assertNull(r.getEntry(key2).getValue());
* assertIndexDetailsEquals(val2, r.get(key2)); // New entry should be fetched } }); tearDown(); }
* finally { // HashSet destroyedRoots = new HashSet(); try { client1.invoke(() ->
* CacheTestCase.remoteTearDown()); client1.invoke(() -> disconnectFromDS()); } finally {
* client2.invoke(() -> CacheTestCase.remoteTearDown()); client2.invoke(() -> disconnectFromDS());
* } } }
*/
// disabled - per Sudhir we don't support multiple bridges in the same VM
// public void test0362BridgeServersWithDiffGroupsInSameVM() throws Exception {
// final String name = this.getName();
// final Host host = Host.getHost(0);
// final VM server = host.getVM(3);
// final VM client1 = host.getVM(1);
// final VM client2 = host.getVM(2);
//
// final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(3);
// final int bs1Port = ports[0];
// final int bs2Port = ports[1];
//
// try {
// server.invoke(new CacheSerializableRunnable("Setup BridgeServers and Gateway") {
// public void run2() throws CacheException
// {
// Cache cache = getCache();
//
// try {
//
// // Start server in group 1
// CacheServer bridge1 = cache.addCacheServer();
// bridge1.setPort(bs1Port);
// String[] group1 = {"zGroup1"};
// bridge1.setGroups(group1);
// bridge1.start();
//
// // start server in group 2
// CacheServer bridge2 = cache.addCacheServer();
// bridge2.setPort(bs2Port);
// bridge2.setNotifyBySubscription(true);
// String[] group2 = {"zGroup2"};
// bridge2.setGroups(group2);
// bridge2.start();
// getLogWriter().info("zGroup1 port should be "+bs1Port+" zGroup2 port should be "+bs2Port);
// } catch (IOException ioe) {
// fail("Setup of BridgeServer test " + name + " failed", ioe );
// }
//
// createRootRegion(name, getRegionAttributes());
// }
// });
//
// client1.invoke(new CacheSerializableRunnable("Test client1 to zGroup2") {
// public void run2() throws CacheException
// {
// createLonerDS();
// AttributesFactory factory = new AttributesFactory();
// factory.setScope(Scope.LOCAL);
// ClientServerTestCase.configureConnectionPool(factory,getServerHostName(host),bs1Port,-1,true,-1,-1,
// "zGroup2");
// Region r = createRootRegion(name, factory.create());
// r.registerInterest("whatever");
// }
// });
//
// client2.invoke(new CacheSerializableRunnable("Test client2 to zGroup1") {
// public void run2() throws CacheException
// {
// createLonerDS();
// AttributesFactory factory = new AttributesFactory();
// ClientServerTestCase.configureConnectionPool(factory,getServerHostName(host),bs2Port,-1,true,-1,-1,
// "zGroup1");
//
// factory.setScope(Scope.LOCAL);
// Region r = createRootRegion(name, factory.create());
// r.registerInterest("whatever");
// }
// });
//
// tearDown();
// } finally {
// try {
// client1.invoke(() -> CacheTestCase.remoteTearDown());
// client1.invoke(() -> disconnectFromDS());
// } finally {
// client2.invoke(() -> CacheTestCase.remoteTearDown());
// client2.invoke(() -> disconnectFromDS());
// }
// }
// }
public static class DelayListener extends CacheListenerAdapter {
private final int delay;
public DelayListener(int delay) {
this.delay = delay;
}
private void delay() {
try {
Thread.sleep(this.delay);
} catch (InterruptedException ignore) {
fail("interrupted");
}
}
@Override
public void afterCreate(EntryEvent event) {
delay();
}
@Override
public void afterDestroy(EntryEvent event) {
delay();
}
@Override
public void afterInvalidate(EntryEvent event) {
delay();
}
@Override
public void afterRegionDestroy(RegionEvent event) {
delay();
}
@Override
public void afterRegionCreate(RegionEvent event) {
delay();
}
@Override
public void afterRegionInvalidate(RegionEvent event) {
delay();
}
@Override
public void afterUpdate(EntryEvent event) {
delay();
}
@Override
public void afterRegionClear(RegionEvent event) {
delay();
}
@Override
public void afterRegionLive(RegionEvent event) {
delay();
}
}
/**
* Make sure a tx done in a server on an empty region gets sent to clients who have registered
* interest.
*/
@Test
public void test037Bug39526part1() throws CacheException, InterruptedException {
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
// Create the cache servers with distributed, empty region
SerializableRunnable createServer = new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.EMPTY);
factory.setConcurrencyChecksEnabled(false);
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
};
getSystem().getLogWriter().info("before create server");
vm0.invoke(createServer);
// Create cache server client
final String host0 = NetworkUtils.getServerHostName(host);
final int vm0Port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
SerializableRunnable createClient =
new CacheSerializableRunnable("Create Cache Server Client") {
@Override
public void run2() throws CacheException {
getLonerSystem();
// create the region
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
ClientServerTestCase.configureConnectionPool(factory, host0, vm0Port, -1, true, -1, -1,
null);
createRegion(name, factory.create());
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
region.registerInterestRegex(".*");
}
};
getSystem().getLogWriter().info("before create client");
vm1.invoke(createClient);
// now do a tx in the server
SerializableRunnable doServerTx = new CacheSerializableRunnable("doServerTx") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
Cache cache = getCache();
CacheTransactionManager txmgr = cache.getCacheTransactionManager();
txmgr.begin();
try {
region.put("k1", "v1");
region.put("k2", "v2");
region.put("k3", "v3");
} finally {
txmgr.commit();
}
}
};
getSystem().getLogWriter().info("before doServerTx");
vm0.invoke(doServerTx);
// now verify that the client receives the committed data
SerializableRunnable validateClient =
new CacheSerializableRunnable("Validate Cache Server Client") {
@Override
public void run2() throws CacheException {
final LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
// wait for a while for us to have the correct number of entries
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
return region.size() == 3;
}
@Override
public String description() {
return "waiting for region to be size 3";
}
};
GeodeAwaitility.await().untilAsserted(ev);
// assertIndexDetailsEquals(3, region.size());
assertTrue(region.containsKey("k1"));
assertTrue(region.containsKey("k2"));
assertTrue(region.containsKey("k3"));
assertEquals("v1", region.getEntry("k1").getValue());
assertEquals("v2", region.getEntry("k2").getValue());
assertEquals("v3", region.getEntry("k3").getValue());
}
};
getSystem().getLogWriter().info("before confirmCommitOnClient");
vm1.invoke(validateClient);
}
/**
* Now confirm that a tx done in a peer of a server (the server having an empty region and wanting
* all events) sends the tx to its clients
*/
@Test
public void test038Bug39526part2() throws CacheException, InterruptedException {
disconnectAllFromDS();
final String name = this.getName();
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
// Create the cache servers with distributed, empty region
SerializableRunnable createServer = new CacheSerializableRunnable("Create Cache Server") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setConcurrencyChecksEnabled(false);
factory.setDataPolicy(DataPolicy.EMPTY);
factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
createRegion(name, factory.create());
// pause(1000);
try {
startBridgeServer(0);
} catch (Exception ex) {
org.apache.geode.test.dunit.Assert.fail("While starting CacheServer", ex);
}
}
};
getSystem().getLogWriter().info("before create server");
vm0.invoke(createServer);
// Create cache server client
final String host0 = NetworkUtils.getServerHostName(host);
final int vm0Port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
SerializableRunnable createClient =
new CacheSerializableRunnable("Create Cache Server Client") {
@Override
public void run2() throws CacheException {
getLonerSystem();
// create the region
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(false);
// create bridge writer
ClientServerTestCase.configureConnectionPool(factory, host0, vm0Port, -1, true, -1, -1,
null);
createRegion(name, factory.create());
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
region.registerInterestRegex(".*");
}
};
getSystem().getLogWriter().info("before create client");
vm1.invoke(createClient);
SerializableRunnable createServerPeer = new CacheSerializableRunnable("Create Server Peer") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.EMPTY);
factory.setConcurrencyChecksEnabled(false);
createRegion(name, factory.create());
}
};
getSystem().getLogWriter().info("before create server peer");
vm2.invoke(createServerPeer);
// now do a tx in the server
SerializableRunnable doServerTx = new CacheSerializableRunnable("doServerTx") {
@Override
public void run2() throws CacheException {
LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
Cache cache = getCache();
CacheTransactionManager txmgr = cache.getCacheTransactionManager();
txmgr.begin();
try {
region.put("k1", "v1");
region.put("k2", "v2");
region.put("k3", "v3");
} finally {
txmgr.commit();
}
}
};
getSystem().getLogWriter().info("before doServerTx");
vm2.invoke(doServerTx);
// @todo verify server received it but to do this need a listener in
// the server
// now verify that the client receives the committed data
SerializableRunnable validateClient =
new CacheSerializableRunnable("Validate Cache Server Client") {
@Override
public void run2() throws CacheException {
final LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
// wait for a while for us to have the correct number of entries
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
return region.size() == 3;
}
@Override
public String description() {
return "waiting for region to be size 3";
}
};
GeodeAwaitility.await().untilAsserted(ev);
// assertIndexDetailsEquals(3, region.size());
assertTrue(region.containsKey("k1"));
assertTrue(region.containsKey("k2"));
assertTrue(region.containsKey("k3"));
assertEquals("v1", region.getEntry("k1").getValue());
assertEquals("v2", region.getEntry("k2").getValue());
assertEquals("v3", region.getEntry("k3").getValue());
}
};
getSystem().getLogWriter().info("before confirmCommitOnClient");
vm1.invoke(validateClient);
disconnectAllFromDS();
}
static class Order implements DataSerializable {
int index;
public Order() {}
public void init(int index) {
this.index = index;
}
public int getIndex() {
return index;
}
@Override
public void toData(DataOutput out) throws IOException {
out.writeInt(index);
}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
index = in.readInt();
}
}
}