blob: 31caf256b472039c11dc7ac8338ba668d3812785 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.pdx;
import static org.apache.geode.cache.InterestResultPolicy.KEYS_VALUES;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.SerializationException;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheLoader;
import org.apache.geode.cache.CacheLoaderException;
import org.apache.geode.cache.EvictionAction;
import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.LoaderHelper;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.junit.categories.SerializationTest;
/**
* A test to ensure that we do not deserialize PDX objects when we shouldn't
*
* This test is trying to cover all of the possible access paths.
*
*
*/
@Category({SerializationTest.class})
public class PdxDeserializationDUnitTest extends JUnit4CacheTestCase {
public PdxDeserializationDUnitTest() {
super();
}
/**
* Test that we don't deserialize objects on a remote peer when performing operations.
*/
@Test
public void testP2P() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
doTest(vm0, vm1);
}
/**
* Test to make sure we don't deserialize objects on a server that is a datastore
*/
@Test
public void testClientToDataStore() {
Host host = Host.getHost(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
doTest(vm2, vm1);
}
/**
* Test to make sure we don't deserialize objects on a server that is an accessor.
*/
@Test
public void testClientToAccessor() {
Host host = Host.getHost(0);
VM vm1 = host.getVM(1);
VM vm3 = host.getVM(3);
doTest(vm3, vm1);
}
/**
* Test to make sure we don't deserialize objects on a client that has registered interest.
*/
@Test
public void testAccessorToClient() {
Host host = Host.getHost(0);
VM vm1 = host.getVM(1);
VM vm3 = host.getVM(2);
doTest(vm1, vm3);
}
/**
* This test creates the following topology
*
* vm0 a peer accessor for both a PR and a replicate region vm1 a peer data store for a replicate
* region and a PR vm2 a client of vm0 vm3 a client vm1
*
* The test then performs all region operations in operationVM, while deserialization is prevented
* in disallowDeserializationVM.
*
* If an operation causes a deserialization in the disallow VM, this test will fail.
*
*/
private void doTest(VM operationVM, VM disallowDeserializationVM) {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
// Create an accessor
final int port0 = (Integer) vm0.invoke(new SerializableCallable() {
@Override
public Object call() {
Cache cache = getCache();
CacheServer server = createCacheServer(cache);
cache.createRegionFactory(RegionShortcut.REPLICATE_PROXY).create("replicate");
cache.createRegionFactory(RegionShortcut.PARTITION_PROXY).create("pr");
cache.createRegionFactory(RegionShortcut.REPLICATE_PROXY).create("overflow_replicate");
cache.createRegionFactory(RegionShortcut.PARTITION_PROXY).create("overflow_pr");
return server.getPort();
}
});
// Create a datastore
final int port1 = (Integer) vm1.invoke(new SerializableCallable() {
@Override
public Object call() {
Cache cache = getCache();
CacheServer server = createCacheServer(cache);
cache.createRegionFactory(RegionShortcut.REPLICATE).setCacheLoader(new TestCacheLoader())
.create("replicate");
cache.createRegionFactory(RegionShortcut.PARTITION).setCacheLoader(new TestCacheLoader())
.create("pr");
cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).setMaxOplogSize(1)
.create("store");
// these regions will test that faulting in an object
// from disk doesn't cause an issue.
cache.createRegionFactory(RegionShortcut.REPLICATE_OVERFLOW)
.setEvictionAttributes(
EvictionAttributes.createLRUEntryAttributes(1, EvictionAction.OVERFLOW_TO_DISK))
.setDiskStoreName("store").setCacheLoader(new TestCacheLoader())
.create("overflow_replicate");
cache.createRegionFactory(RegionShortcut.PARTITION_OVERFLOW)
.setEvictionAttributes(
EvictionAttributes.createLRUEntryAttributes(1, EvictionAction.OVERFLOW_TO_DISK))
.setDiskStoreName("store").setCacheLoader(new TestCacheLoader()).create("overflow_pr");
return server.getPort();
}
});
// create a client connected to the accessor
vm2.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
createClient(port0);
return null;
}
});
// create a client connected to the datastore
vm3.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
createClient(port1);
return null;
}
});
// Disallow deserialization
disallowDeserializationVM.invoke(new SerializableRunnable() {
@Override
public void run() {
TestSerializable.throwExceptionOnDeserialization = true;
}
});
// perform operations in the target VM
try {
operationVM.invoke(new SerializableRunnable() {
@Override
public void run() {
Cache cache = getCache();
doOperations(cache.getRegion("replicate"));
doOperations(cache.getRegion("pr"));
doOperations(cache.getRegion("overflow_replicate"));
doOperations(cache.getRegion("overflow_pr"));
}
});
} finally {
// Ok, now allow deserialization.
disallowDeserializationVM.invoke(new SerializableRunnable() {
@Override
public void run() {
TestSerializable.throwExceptionOnDeserialization = false;
}
});
}
// Sanity Check to make sure the values not in some weird form
// on the actual datastore.
vm1.invoke(new SerializableRunnable() {
@Override
public void run() {
Cache cache = getCache();
checkValues(cache.getRegion("replicate"));
checkValues(cache.getRegion("pr"));
checkValues(cache.getRegion("overflow_replicate"));
checkValues(cache.getRegion("overflow_pr"));
}
});
// Make sure the clients receive keys they have registered interest in
checkRegisterInterestValues(vm2);
checkRegisterInterestValues(vm3);
}
private void checkRegisterInterestValues(VM vm2) {
vm2.invoke(new SerializableRunnable() {
@Override
public void run() {
Cache cache = getCache();
checkClientValue(cache.getRegion("replicate"));
checkClientValue(cache.getRegion("pr"));
checkClientValue(cache.getRegion("overflow_replicate"));
checkClientValue(cache.getRegion("overflow_pr"));
}
});
}
protected void checkClientValue(final Region<Object, Object> region) {
// Because register interest is asynchronous, we need to wait for the value to arrive.
GeodeAwaitility.await().untilAsserted(new WaitCriterion() {
@Override
public boolean done() {
return region.get("A") != null;
}
@Override
public String description() {
return "Client region never received value for key A";
}
});
assertEquals(TestSerializable.class, region.get("A").getClass());
// do a register interest which will download the value
region.registerInterest("B", KEYS_VALUES);
assertEquals(TestSerializable.class, region.get("B").getClass());
}
private void doOperations(Region<Object, Object> region) {
// Do a put and a get
region.put("A", new TestSerializable());
assertEquals(TestSerializable.class, region.get("A").getClass());
// Do a cache load
assertEquals(TestSerializable.class, region.get("B").getClass());
// Make sure the cache load is in the right object form
assertEquals(TestSerializable.class, region.get("B").getClass());
// If we're a client region, try a register interest
if (region.getAttributes().getPoolName() != null) {
region.registerInterest(".*", InterestResultPolicy.KEYS_VALUES);
}
// Do a query
try {
SelectResults queryResults = (SelectResults) getCache().getQueryService()
.newQuery("select * from " + region.getFullPath()).execute();
for (Object result : queryResults.asList()) {
assertEquals(TestSerializable.class, result.getClass());
}
} catch (Exception e) {
Assert.fail("got exception from query", e);
}
// TODO Transactions don't work
// CacheTransactionManager txManager = getCache().getCacheTransactionManager();
// //Test puts and get in a transaction
// txManager.begin();
// region.put("C", new TestSerializable());
// assertIndexDetailsEquals(TestSerializable.class, region.get("C").getClass());
// txManager.commit();
//
// txManager.begin();
// assertIndexDetailsEquals(TestSerializable.class, region.get("C").getClass());
// txManager.commit();
//
//
// //Test cache load in a transaction
// txManager.begin();
// assertIndexDetailsEquals(TestSerializable.class, region.get("D").getClass());
// txManager.commit();
//
// txManager.begin();
// assertIndexDetailsEquals(TestSerializable.class, region.get("D").getClass());
// txManager.commit();
}
private void checkValues(Region<Object, Object> region) {
assertEquals(TestSerializable.class, region.get("A").getClass());
assertEquals(TestSerializable.class, region.get("B").getClass());
// TODO Transactions don't work
// assertIndexDetailsEquals(TestSerializable.class, region.get("C").getClass());
// assertIndexDetailsEquals(TestSerializable.class, region.get("D").getClass());
}
private CacheServer createCacheServer(Cache cache) {
CacheServer server = cache.addCacheServer();
server.setPort(AvailablePortHelper.getRandomAvailableTCPPort());
try {
server.start();
} catch (IOException e) {
Assert.fail("got exception", e);
}
return server;
}
private void createClient(final int port0) {
ClientCacheFactory cf = new ClientCacheFactory();
cf.addPoolServer("localhost", port0);
cf.setPoolSubscriptionEnabled(true);
ClientCache cache = getClientCache(cf);
Region replicate =
cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("replicate");
Region pr = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("pr");
cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
.create("overflow_replicate");
cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("overflow_pr");
// Register interest in a key
replicate.registerInterest("A", InterestResultPolicy.KEYS_VALUES);
pr.registerInterest("A", InterestResultPolicy.KEYS_VALUES);
}
public static class TestCacheLoader implements CacheLoader {
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public Object load(LoaderHelper helper) throws CacheLoaderException {
return new TestSerializable();
}
}
/**
* Test PDX object. This object will fail to be deserialized in the target VM.
*/
public static class TestSerializable implements PdxSerializable {
private static boolean throwExceptionOnDeserialization = false;
@Override
public void toData(PdxWriter writer) {
// TODO Auto-generated method stub
}
@Override
public void fromData(PdxReader reader) {
if (throwExceptionOnDeserialization) {
throw new SerializationException("Deserialization should not be happening in this VM");
}
}
}
}