blob: ff9ecc1787042165659aa37a4a485589fcfdbcfe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache30;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.AttributesMutator;
import org.apache.geode.cache.CacheEvent;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.CacheLoader;
import org.apache.geode.cache.CacheLoaderException;
import org.apache.geode.cache.CacheWriter;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.InterestPolicy;
import org.apache.geode.cache.LoaderHelper;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionEvent;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.SubscriptionAttributes;
import org.apache.geode.cache.util.CacheWriterAdapter;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
/**
* Make sure that operations are distributed and done in regions remote from a PROXY
*
* @since GemFire 5.0
*/
public class ProxyDUnitTest extends JUnit4CacheTestCase {
private transient Region r;
private transient DistributedMember otherId;
protected transient int clInvokeCount;
protected transient CacheEvent clLastEvent;
public ProxyDUnitTest() {
super();
}
private VM getOtherVm() {
Host host = Host.getHost(0);
return host.getVM(0);
}
private void initOtherId() {
VM vm = getOtherVm();
vm.invoke(new CacheSerializableRunnable("Connect") {
@Override
public void run2() throws CacheException {
getCache();
}
});
otherId = vm.invoke(() -> getSystem().getDistributedMember());
}
private void doCreateOtherVm() {
VM vm = getOtherVm();
vm.invoke(new CacheSerializableRunnable("create root") {
@Override
public void run2() throws CacheException {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.REPLICATE);
af.setScope(Scope.DISTRIBUTED_ACK);
createRootRegion("ProxyDUnitTest", af.create());
}
});
}
////////////////////// Test Methods //////////////////////
/**
* check distributed ops that originate in a PROXY are correctly distributed to non-proxy regions.
*/
private void distributedOps(DataPolicy dp, InterestPolicy ip) throws CacheException {
initOtherId();
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(dp);
af.setSubscriptionAttributes(new SubscriptionAttributes(ip));
af.setScope(Scope.DISTRIBUTED_ACK);
Region r = createRootRegion("ProxyDUnitTest", af.create());
doCreateOtherVm();
r.put("putkey", "putvalue1");
getOtherVm().invoke(new CacheSerializableRunnable("check put") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion("ProxyDUnitTest");
assertEquals(true, r.containsKey("putkey"));
assertEquals("putvalue1", r.getEntry("putkey").getValue());
r.put("putkey", "putvalue2");
}
});
assertEquals(false, r.containsKey("putkey"));
assertEquals("putvalue2", r.get("putkey")); // netsearch
r.invalidate("putkey");
getOtherVm().invoke(new CacheSerializableRunnable("check invalidate") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion("ProxyDUnitTest");
assertEquals(true, r.containsKey("putkey"));
assertEquals(null, r.getEntry("putkey").getValue());
}
});
assertEquals(null, r.get("putkey")); // invalid so total miss
r.destroy("putkey");
getOtherVm().invoke(new CacheSerializableRunnable("check destroy") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion("ProxyDUnitTest");
assertEquals(false, r.containsKey("putkey"));
}
});
assertEquals(null, r.get("putkey")); // total miss
r.create("createKey", "createValue1");
getOtherVm().invoke(new CacheSerializableRunnable("check create") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion("ProxyDUnitTest");
assertEquals(true, r.containsKey("createKey"));
assertEquals("createValue1", r.getEntry("createKey").getValue());
}
});
{
Map m = new HashMap();
m.put("putAllKey1", "putAllValue1");
m.put("putAllKey2", "putAllValue2");
r.putAll(m, "putAllCallback");
}
getOtherVm().invoke(new CacheSerializableRunnable("check putAll") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion("ProxyDUnitTest");
assertEquals(true, r.containsKey("putAllKey1"));
assertEquals("putAllValue1", r.getEntry("putAllKey1").getValue());
assertEquals(true, r.containsKey("putAllKey2"));
assertEquals("putAllValue2", r.getEntry("putAllKey2").getValue());
}
});
r.clear();
getOtherVm().invoke(new CacheSerializableRunnable("check clear") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion("ProxyDUnitTest");
assertEquals(0, r.size());
}
});
getOtherVm().invoke(new CacheSerializableRunnable("install CacheWriter") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion("ProxyDUnitTest");
AttributesMutator am = r.getAttributesMutator();
CacheWriter cw = new CacheWriterAdapter() {
@Override
public void beforeCreate(EntryEvent event) throws CacheWriterException {
throw new CacheWriterException("expected");
}
};
am.setCacheWriter(cw);
}
});
try {
r.put("putkey", "putvalue");
fail("expected CacheWriterException");
} catch (CacheWriterException ignored) {
}
getOtherVm().invoke(new CacheSerializableRunnable("check clear") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion("ProxyDUnitTest");
assertEquals(0, r.size());
}
});
assertEquals(null, r.get("loadkey")); // total miss
getOtherVm().invoke(new CacheSerializableRunnable("install CacheLoader") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion("ProxyDUnitTest");
AttributesMutator am = r.getAttributesMutator();
am.setCacheWriter(null); // clear csche writer
CacheLoader cl = new CacheLoader() {
@Override
public Object load(LoaderHelper helper) throws CacheLoaderException {
if (helper.getKey().equals("loadkey")) {
return "loadvalue";
} else if (helper.getKey().equals("loadexception")) {
throw new CacheLoaderException("expected");
} else {
return null;
}
}
@Override
public void close() {}
};
am.setCacheLoader(cl);
}
});
assertEquals("loadvalue", r.get("loadkey")); // net load
assertEquals(null, r.get("foobar")); // total miss
try {
r.get("loadexception");
fail("expected CacheLoaderException");
} catch (CacheLoaderException ignored) {
}
r.destroyRegion();
getOtherVm().invoke(new CacheSerializableRunnable("check clear") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion("ProxyDUnitTest");
assertEquals(null, r);
}
});
}
/**
* Gets the DMStats for the vm's DM
*/
private DMStats getDMStats() {
return getCache().getDistributionManager().getStats();
}
/**
* check remote ops done in a normal vm are correctly distributed to PROXY regions
*/
private void remoteOriginOps(DataPolicy dp, InterestPolicy ip) throws CacheException {
initOtherId();
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(dp);
af.setSubscriptionAttributes(new SubscriptionAttributes(ip));
af.setScope(Scope.DISTRIBUTED_ACK);
CacheListener cl1 = new CacheListener() {
@Override
public void afterUpdate(EntryEvent e) {
clLastEvent = e;
clInvokeCount++;
}
@Override
public void afterCreate(EntryEvent e) {
clLastEvent = e;
clInvokeCount++;
}
@Override
public void afterInvalidate(EntryEvent e) {
clLastEvent = e;
clInvokeCount++;
}
@Override
public void afterDestroy(EntryEvent e) {
clLastEvent = e;
clInvokeCount++;
}
@Override
public void afterRegionInvalidate(RegionEvent e) {
clLastEvent = e;
clInvokeCount++;
}
@Override
public void afterRegionDestroy(RegionEvent e) {
clLastEvent = e;
clInvokeCount++;
}
@Override
public void afterRegionClear(RegionEvent e) {
clLastEvent = e;
clInvokeCount++;
}
@Override
public void afterRegionCreate(RegionEvent e) {}
@Override
public void afterRegionLive(RegionEvent e) {}
@Override
public void close() {}
};
af.addCacheListener(cl1);
Region r = createRootRegion("ProxyDUnitTest", af.create());
clInvokeCount = 0;
doCreateOtherVm();
DMStats stats = getDMStats();
long receivedMsgs = stats.getReceivedMessages();
if (ip.isAll()) {
getOtherVm().invoke(new CacheSerializableRunnable("do put") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion("ProxyDUnitTest");
r.put("p", "v");
}
});
assertEquals(1, clInvokeCount);
assertEquals(Operation.CREATE, clLastEvent.getOperation());
assertEquals(true, clLastEvent.isOriginRemote());
assertEquals(otherId, clLastEvent.getDistributedMember());
assertEquals(null, ((EntryEvent) clLastEvent).getOldValue());
assertEquals(false, ((EntryEvent) clLastEvent).isOldValueAvailable()); // failure
assertEquals("v", ((EntryEvent) clLastEvent).getNewValue());
assertEquals("p", ((EntryEvent) clLastEvent).getKey());
clInvokeCount = 0;
getOtherVm().invoke(new CacheSerializableRunnable("do create") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion("ProxyDUnitTest");
r.create("c", "v");
}
});
assertEquals(1, clInvokeCount);
assertEquals(Operation.CREATE, clLastEvent.getOperation());
assertEquals(true, clLastEvent.isOriginRemote());
assertEquals(otherId, clLastEvent.getDistributedMember());
assertEquals(null, ((EntryEvent) clLastEvent).getOldValue());
assertEquals(false, ((EntryEvent) clLastEvent).isOldValueAvailable());
assertEquals("v", ((EntryEvent) clLastEvent).getNewValue());
assertEquals("c", ((EntryEvent) clLastEvent).getKey());
clInvokeCount = 0;
getOtherVm().invoke(new CacheSerializableRunnable("do update") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion("ProxyDUnitTest");
r.put("c", "v2");
}
});
assertEquals(1, clInvokeCount);
assertEquals(Operation.UPDATE, clLastEvent.getOperation());
assertEquals(true, clLastEvent.isOriginRemote());
assertEquals(otherId, clLastEvent.getDistributedMember());
assertEquals(null, ((EntryEvent) clLastEvent).getOldValue());
assertEquals(false, ((EntryEvent) clLastEvent).isOldValueAvailable());
assertEquals("v2", ((EntryEvent) clLastEvent).getNewValue());
assertEquals("c", ((EntryEvent) clLastEvent).getKey());
clInvokeCount = 0;
getOtherVm().invoke(new CacheSerializableRunnable("do invalidate") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion("ProxyDUnitTest");
r.invalidate("c");
}
});
assertEquals(1, clInvokeCount);
assertEquals(Operation.INVALIDATE, clLastEvent.getOperation());
assertEquals(true, clLastEvent.isOriginRemote());
assertEquals(otherId, clLastEvent.getDistributedMember());
assertEquals(null, ((EntryEvent) clLastEvent).getOldValue());
assertEquals(false, ((EntryEvent) clLastEvent).isOldValueAvailable());
assertEquals(null, ((EntryEvent) clLastEvent).getNewValue());
assertEquals("c", ((EntryEvent) clLastEvent).getKey());
clInvokeCount = 0;
getOtherVm().invoke(new CacheSerializableRunnable("do destroy") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion("ProxyDUnitTest");
r.destroy("c");
}
});
assertEquals(1, clInvokeCount);
assertEquals(Operation.DESTROY, clLastEvent.getOperation());
assertEquals(true, clLastEvent.isOriginRemote());
assertEquals(otherId, clLastEvent.getDistributedMember());
assertEquals(null, ((EntryEvent) clLastEvent).getOldValue());
assertEquals(false, ((EntryEvent) clLastEvent).isOldValueAvailable());
assertEquals(null, ((EntryEvent) clLastEvent).getNewValue());
assertEquals("c", ((EntryEvent) clLastEvent).getKey());
clInvokeCount = 0;
getOtherVm().invoke(new CacheSerializableRunnable("do putAll") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion("ProxyDUnitTest");
Map m = new HashMap();
m.put("putAllKey1", "putAllValue1");
m.put("putAllKey2", "putAllValue2");
r.putAll(m);
}
});
assertEquals(2, clInvokeCount);
// @todo darrel; check putAll events
clInvokeCount = 0;
getOtherVm().invoke(new CacheSerializableRunnable("do netsearch") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion("ProxyDUnitTest");
assertEquals(null, r.get("loadkey")); // total miss
}
});
assertEquals(0, clInvokeCount);
} else {
getOtherVm().invoke(new CacheSerializableRunnable("do entry ops") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion("ProxyDUnitTest");
r.put("p", "v");
r.create("c", "v");
r.put("c", "v"); // update
r.invalidate("c");
r.destroy("c");
{
Map m = new HashMap();
m.put("putAllKey1", "putAllValue1");
m.put("putAllKey2", "putAllValue2");
r.putAll(m);
}
assertEquals(null, r.get("loadkey")); // total miss
}
});
assertEquals(0, clInvokeCount);
assertEquals(0, r.size());
// check the stats to make sure none of the above sent up messages
assertEquals(receivedMsgs, stats.getReceivedMessages());
}
{
AttributesMutator am = r.getAttributesMutator();
CacheLoader cl = new CacheLoader() {
@Override
public Object load(LoaderHelper helper) throws CacheLoaderException {
if (helper.getKey().equals("loadkey")) {
return "loadvalue";
} else if (helper.getKey().equals("loadexception")) {
throw new CacheLoaderException("expected");
} else {
return null;
}
}
@Override
public void close() {}
};
am.setCacheLoader(cl);
}
receivedMsgs = stats.getReceivedMessages();
getOtherVm().invoke(new CacheSerializableRunnable("check net loader") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion("ProxyDUnitTest");
assertEquals("loadvalue", r.get("loadkey")); // net load
assertEquals(null, r.get("foobar")); // total miss
try {
r.get("loadexception");
fail("expected CacheLoaderException");
} catch (CacheLoaderException ignored) {
}
}
});
assertTrue(stats.getReceivedMessages() > receivedMsgs);
if (ip.isAll()) {
assertEquals(1, clInvokeCount);
assertEquals(Operation.NET_LOAD_CREATE, clLastEvent.getOperation());
assertEquals(true, clLastEvent.isOriginRemote());
assertEquals(otherId, clLastEvent.getDistributedMember());
assertEquals(null, ((EntryEvent) clLastEvent).getOldValue());
assertEquals(false, ((EntryEvent) clLastEvent).isOldValueAvailable());
clInvokeCount = 0;
} else {
assertEquals(0, clInvokeCount);
}
{
AttributesMutator am = r.getAttributesMutator();
am.setCacheLoader(null);
CacheWriter cw = new CacheWriterAdapter() {
@Override
public void beforeCreate(EntryEvent event) throws CacheWriterException {
throw new CacheWriterException("expected");
}
};
am.setCacheWriter(cw);
}
receivedMsgs = stats.getReceivedMessages();
getOtherVm().invoke(new CacheSerializableRunnable("check net write") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion("ProxyDUnitTest");
try {
r.put("putkey", "putvalue");
fail("expected CacheWriterException");
} catch (CacheWriterException ignored) {
}
}
});
assertTrue(stats.getReceivedMessages() > receivedMsgs);
{
AttributesMutator am = r.getAttributesMutator();
am.setCacheWriter(null);
}
assertEquals(0, clInvokeCount);
clLastEvent = null;
getOtherVm().invoke(new CacheSerializableRunnable("check region invalidate") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion("ProxyDUnitTest");
r.invalidateRegion();
}
});
assertEquals(1, clInvokeCount);
assertEquals(Operation.REGION_INVALIDATE, clLastEvent.getOperation());
assertEquals(true, clLastEvent.isOriginRemote());
assertEquals(otherId, clLastEvent.getDistributedMember());
clLastEvent = null;
getOtherVm().invoke(new CacheSerializableRunnable("check region clear") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion("ProxyDUnitTest");
r.clear();
}
});
assertEquals(2, clInvokeCount);
assertEquals(Operation.REGION_CLEAR, clLastEvent.getOperation());
assertEquals(true, clLastEvent.isOriginRemote());
assertEquals(otherId, clLastEvent.getDistributedMember());
clLastEvent = null;
getOtherVm().invoke(new CacheSerializableRunnable("check region destroy") {
@Override
public void run2() throws CacheException {
Region r = getRootRegion("ProxyDUnitTest");
r.destroyRegion();
}
});
assertEquals(3, clInvokeCount);
assertEquals(Operation.REGION_DESTROY, clLastEvent.getOperation());
assertEquals(true, clLastEvent.isOriginRemote());
assertEquals(otherId, clLastEvent.getDistributedMember());
assertTrue(r.isDestroyed());
}
@Test
public void testDistributedOpsPROXY() throws CacheException {
distributedOps(DataPolicy.EMPTY, InterestPolicy.CACHE_CONTENT);
}
@Test
public void testRemoteOriginOpsPROXY() throws CacheException {
remoteOriginOps(DataPolicy.EMPTY, InterestPolicy.CACHE_CONTENT);
}
@Test
public void testRemoteOriginOpsPROXY_ALL() throws CacheException {
remoteOriginOps(DataPolicy.EMPTY, InterestPolicy.ALL);
}
}