blob: 41b48080105a711eb5877eaf899811b37efb0a4d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Properties;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.util.CacheWriterAdapter;
import org.apache.geode.internal.AvailablePort;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.junit.categories.ClientServerTest;
/**
* This tests cases where we have both a connection pool and a bridge loader.
*/
@Category({ClientServerTest.class})
public class ConnectionPoolAndLoaderDUnitTest extends JUnit4CacheTestCase {
private static int bridgeServerPort;
private boolean useLocator;
@Override
public final void preTearDownCacheTestCase() {
disconnectAllFromDS();
}
/**
* Tests that we can have both a connection pool and a bridge loader. The expected order of
* operations for get is. get from server load from loader.
*
* Anything that is loaded on the client is put on the server..
*/
@Test
public void testPoolAndLoader() throws Exception {
final String regionName = this.getName();
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
final int serverPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
server.invoke(new SerializableCallable() {
@Override
public Object call() throws IOException {
Cache cache = getCache();
AttributesFactory af = new AttributesFactory();
RegionAttributes attrs = af.create();
cache.createRegion(regionName, attrs);
startBridgeServer(serverPort, true);
return null;
}
});
client.invoke(new SerializableCallable() {
@Override
public Object call() {
Cache cache = getCache();
PoolFactory factory = PoolManager.createFactory();
factory.addServer(NetworkUtils.getServerHostName(host), serverPort);
factory.create("pool1");
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.DEFAULT);
af.setScope(Scope.LOCAL);
af.setPoolName("pool1");
af.setCacheLoader(new MyCacheLoader("loaded"));
RegionAttributes attrs = af.create();
cache.createRegion(regionName, attrs);
return null;
}
});
client.invoke(new SerializableRunnable() {
@Override
public void run() {
Region region = getRootRegion(regionName);
region.put("a", "put-a");
region.put("b", "put-b");
assertEquals("loaded-c", region.get("c"));
assertEquals("loaded-d", region.get("d"));
}
});
server.invoke(new SerializableRunnable() {
@Override
public void run() {
Region region = getRootRegion(regionName);
assertEquals("put-a", region.get("a"));
assertEquals("put-b", region.get("b"));
assertEquals("loaded-c", region.get("c"));
assertEquals("loaded-d", region.get("d"));
region.put("e", "server-e");
}
});
client.invoke(new SerializableRunnable() {
@Override
public void run() {
Region region = getRootRegion(regionName);
assertEquals("put-a", region.get("a"));
assertEquals("put-b", region.get("b"));
assertEquals("loaded-c", region.get("c"));
assertEquals("loaded-d", region.get("d"));
assertEquals("server-e", region.get("e"));
}
});
}
/**
* Test the we can have both a connection pool and a cache writer.
*
* The expected order of operations for put is: local writer put on server
*/
@Test
public void testPoolAndWriter() throws Exception {
final String regionName = this.getName();
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client = host.getVM(1);
final int serverPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
server.invoke(new SerializableCallable() {
@Override
public Object call() throws IOException {
Cache cache = getCache();
AttributesFactory af = new AttributesFactory();
RegionAttributes attrs = af.create();
cache.createRegion(regionName, attrs);
startBridgeServer(serverPort, true);
return null;
}
});
client.invoke(new SerializableCallable() {
@Override
public Object call() {
Cache cache = getCache();
PoolFactory factory = PoolManager.createFactory();
factory.addServer(NetworkUtils.getServerHostName(host), serverPort);
factory.create("pool1");
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.DEFAULT);
af.setScope(Scope.LOCAL);
af.setPoolName("pool1");
af.setCacheWriter(new MyCacheWriter());
RegionAttributes attrs = af.create();
cache.createRegion(regionName, attrs);
return null;
}
});
client.invoke(new SerializableRunnable() {
@Override
public void run() {
Region region = getRootRegion(regionName);
MyCacheWriter writer = (MyCacheWriter) region.getAttributes().getCacheWriter();
region.put("a", "a");
region.put("b", "b");
region.put("c", "c");
region.destroy("c");
writer.throwException = true;
try {
region.put("a", "new-a");
fail("Should have gotten a cache writer exception");
} catch (CacheWriterException e) {
assertEquals("beforeUpdate", e.getMessage());
}
try {
region.destroy("b");
fail("Should have gotten a cache writer exception");
} catch (CacheWriterException e) {
assertEquals("beforeDestroy", e.getMessage());
}
try {
region.put("d", "d");
fail("Should have gotten a cache writer exception");
} catch (CacheWriterException e) {
assertEquals("beforeCreate", e.getMessage());
}
try {
region.clear();
fail("Should have gotten a cache writer exception");
} catch (CacheWriterException e) {
assertEquals("beforeRegionClear", e.getMessage());
}
try {
region.destroyRegion();
fail("Should have gotten a cache writer exception");
} catch (CacheWriterException e) {
assertEquals("beforeRegionDestroy", e.getMessage());
}
}
});
server.invoke(new SerializableRunnable() {
@Override
public void run() {
Region region = getRootRegion(regionName);
assertEquals("a", region.get("a"));
assertEquals("b", region.get("b"));
assertEquals(null, region.get("c"));
assertEquals(null, region.get("d"));
}
});
}
/**
* Test that we can have a peer, a server, and a bridge loader for the same region.
*
* Expected order 1 localCache 2 peer 3 server 4 loader
*/
@Test
public void testPoolLoadAndPeer() throws Exception {
final String regionName = this.getName();
final Host host = Host.getHost(0);
VM server = host.getVM(0);
VM client1 = host.getVM(1);
VM client2 = host.getVM(2);
final int serverPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
server.invoke(new SerializableCallable() {
@Override
public Object call() throws IOException {
Cache cache = getCache();
AttributesFactory af = new AttributesFactory();
RegionAttributes attrs = af.create();
cache.createRegion(regionName, attrs);
startBridgeServer(serverPort, true);
return null;
}
});
SerializableCallable createClient1 = new SerializableCallable() {
@Override
public Object call() {
// Make sure we get a distributed system that has the locator
useLocator = true;
Cache cache = getCache();
useLocator = false;
PoolFactory factory = PoolManager.createFactory();
factory.addServer(NetworkUtils.getServerHostName(host), serverPort);
factory.create("pool1");
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.NORMAL);
af.setScope(Scope.DISTRIBUTED_ACK);
af.setPoolName("pool1");
af.setCacheLoader(new MyCacheLoader("loaded1"));
RegionAttributes attrs = af.create();
cache.createRegion(regionName, attrs);
return null;
}
};
client1.invoke(createClient1);
SerializableCallable createClient2 = new SerializableCallable() {
@Override
public Object call() {
// Make sure we get a distributed system that has the locator
useLocator = true;
Cache cache = getCache();
useLocator = false;
PoolFactory factory = PoolManager.createFactory();
factory.addServer(NetworkUtils.getServerHostName(host), serverPort);
factory.create("pool1");
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.NORMAL);
af.setScope(Scope.DISTRIBUTED_ACK);
af.setCacheLoader(new MyCacheLoader("loaded2"));
af.setPoolName("pool1");
RegionAttributes attrs = af.create();
cache.createRegion(regionName, attrs);
return null;
}
};
client2.invoke(createClient2);
// We need to test what happens when
// we do a load in client1 in each of these cases:
// Case On Server On Client 1 On Client 2 Expected
// a X server
// b X X client1
// c X X X client1
// d X X client2
// e X client1
// f X X client1
// g X client2 (loader does a netSearch)
// h client1 loader
// Setup scenarios
client1.invoke(new SerializableRunnable() {
@Override
public void run() {
Region region = getRootRegion(regionName);
region.put("b", "client1-b");
region.put("c", "client1-c");
region.put("e", "client1-e");
region.put("f", "client1-f");
}
});
client2.invoke(new SerializableRunnable() {
@Override
public void run() {
Region region = getRootRegion(regionName);
assertEquals("client1-c", region.get("c"));
region.put("d", "client2-d");
assertEquals("client1-f", region.get("f"));
region.put("g", "client2-g");
}
});
server.invoke(new SerializableRunnable() {
@Override
public void run() {
Region region = getRootRegion(regionName);
region.put("a", "server-a");
region.localDestroy("e");
region.localDestroy("f");
region.localDestroy("g");
}
});
// Test the scenarios
client1.invoke(new SerializableRunnable() {
@Override
public void run() {
Region region = getRootRegion(regionName);
assertEquals("server-a", region.get("a"));
assertEquals("client1-b", region.get("b"));
assertEquals("client1-c", region.get("c"));
assertEquals("client2-d", region.get("d"));
assertEquals("client1-e", region.get("e"));
assertEquals("client1-f", region.get("f"));
assertEquals("client2-g", region.get("g"));
assertEquals("loaded1-h", region.get("h"));
}
});
server.invoke(new SerializableRunnable() {
@Override
public void run() {
Region region = getRootRegion(regionName);
assertEquals("server-a", region.get("a"));
assertEquals("client1-b", region.get("b"));
assertEquals("client1-c", region.get("c"));
assertEquals("client2-d", region.get("d"));
assertEquals(null, region.get("e"));
assertEquals(null, region.get("f"));
// dsmith - This result seems somewhat suspect. client1 did a net load
// which found a value in client2, but it never propagated that result
// to the server. After talking with Darrel we decided to keep it this
// way for now.
assertEquals(null, region.get("g"));
assertEquals("loaded1-h", region.get("h"));
}
});
}
private void startBridgeServer(int port, boolean notifyBySubscription) throws IOException {
Cache cache = getCache();
CacheServer bridge = cache.addCacheServer();
bridge.setPort(port);
bridge.setNotifyBySubscription(notifyBySubscription);
bridge.start();
bridgeServerPort = bridge.getPort();
}
@Override
public Properties getDistributedSystemProperties() {
Properties p = new Properties();
if (!useLocator) {
p.setProperty(LOCATORS, "");
p.setProperty(MCAST_PORT, "0");
}
return p;
}
private static class MyCacheWriter extends CacheWriterAdapter {
protected boolean throwException = false;
@Override
public void beforeCreate(EntryEvent event) throws CacheWriterException {
if (throwException) {
throw new CacheWriterException("beforeCreate");
}
}
@Override
public void beforeDestroy(EntryEvent event) throws CacheWriterException {
if (throwException) {
throw new CacheWriterException("beforeDestroy");
}
}
@Override
public void beforeRegionClear(RegionEvent event) throws CacheWriterException {
if (throwException) {
throw new CacheWriterException("beforeRegionClear");
}
}
@Override
public void beforeRegionDestroy(RegionEvent event) throws CacheWriterException {
if (throwException) {
throw new CacheWriterException("beforeRegionDestroy");
}
}
@Override
public void beforeUpdate(EntryEvent event) throws CacheWriterException {
if (throwException) {
throw new CacheWriterException("beforeUpdate");
}
}
}
private static class MyCacheLoader implements CacheLoader {
private String message;
public MyCacheLoader(String message) {
this.message = message;
}
@Override
public Object load(LoaderHelper helper) throws CacheLoaderException {
if (helper.getRegion().getAttributes().getScope().equals(Scope.DISTRIBUTED_ACK)) {
System.err.println("Doing a net search for " + helper.getKey());
Object result = helper.netSearch(false);
System.err.println("Net search found " + result);
if (result != null) {
return result;
}
}
Object key = helper.getKey();
return message + "-" + key;
}
@Override
public void close() {}
}
}