blob: 8d748ebdf03df379fed9091666d9673fe294679d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.pdx;
import static org.apache.geode.internal.Assert.assertTrue;
import static org.apache.geode.internal.Assert.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheEvent;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.TransactionEvent;
import org.apache.geode.cache.TransactionListener;
import org.apache.geode.cache.TransactionWriter;
import org.apache.geode.cache.TransactionWriterException;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.DistributionMessageObserver;
import org.apache.geode.internal.cache.DistributedCacheOperation;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.pdx.internal.EnumInfo;
import org.apache.geode.pdx.internal.PeerTypeRegistration;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.DUnitBlackboard;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.junit.categories.SerializationTest;
@Category({SerializationTest.class})
public class PdxSerializableDUnitTest extends JUnit4CacheTestCase {
private static final SimpleClass ANOBJECT = new SimpleClass(57, (byte) 3);
private static final String TEST_REGION_NAME = "testSimplePdx";
@Test
public void testSimplePut() {
Host host = Host.getHost(0);
VM vm1 = host.getVM(0);
VM vm2 = host.getVM(1);
VM vm3 = host.getVM(2);
SerializableCallable createRegion = new SerializableCallable() {
@Override
public Object call() throws Exception {
AttributesFactory af = new AttributesFactory();
af.setScope(Scope.DISTRIBUTED_ACK);
af.setDataPolicy(DataPolicy.REPLICATE);
createRootRegion(TEST_REGION_NAME, af.create());
return null;
}
};
vm1.invoke(createRegion);
vm2.invoke(createRegion);
vm3.invoke(createRegion);
vm1.invoke(() -> doSimplePut(false));
vm2.invoke(() -> verifySimplePut());
vm3.invoke(new SerializableCallable("check for PDX") {
@Override
public Object call() throws Exception {
assertNotNull(getRootRegion(PeerTypeRegistration.REGION_NAME));
return null;
}
});
}
private void createPR() {
getCache().createRegionFactory(RegionShortcut.PARTITION)
.setPartitionAttributes(new PartitionAttributesFactory<Integer, Object>().create())
.create(TEST_REGION_NAME);
}
@Test
public void testSimplePutOnPRWithTx() {
Host host = Host.getHost(0);
VM vm1 = host.getVM(0);
VM vm2 = host.getVM(1);
VM vm3 = host.getVM(2);
vm1.invoke(() -> createPR());
vm2.invoke(() -> createPR());
vm3.invoke(() -> createPR());
vm1.invoke(() -> doSimplePut(true));
vm2.invoke(() -> verifySimplePut());
vm3.invoke(() -> {
assertNotNull(getRootRegion(PeerTypeRegistration.REGION_NAME));
});
}
@Test
public void testTransactionCallbacksNotInvoked() {
Host host = Host.getHost(0);
VM vm1 = host.getVM(0);
VM vm2 = host.getVM(1);
SerializableCallable createRegionAndAddPoisonedListener = new SerializableCallable() {
@Override
public Object call() throws Exception {
AttributesFactory af = new AttributesFactory();
af.setScope(Scope.DISTRIBUTED_ACK);
af.setDataPolicy(DataPolicy.REPLICATE);
createRootRegion(TEST_REGION_NAME, af.create());
addPoisonedTransactionListeners();
return null;
}
};
vm1.invoke(createRegionAndAddPoisonedListener);
vm2.invoke(createRegionAndAddPoisonedListener);
vm1.invoke(() -> doSimplePut(true));
vm2.invoke(() -> verifySimplePut());
}
@Test
public void testPersistenceDefaultDiskStore() throws Throwable {
SerializableCallable createRegion = new SerializableCallable() {
@Override
public Object call() throws Exception {
// Make sure the type registry is persistent
CacheFactory cf = new CacheFactory();
cf.setPdxPersistent(true);
getCache(cf);
AttributesFactory af = new AttributesFactory();
af.setScope(Scope.DISTRIBUTED_ACK);
af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
createRootRegion(TEST_REGION_NAME, af.create());
return null;
}
};
persistenceTest(createRegion);
}
@Test
public void testPersistenceExplicitDiskStore() throws Throwable {
SerializableCallable createRegion = new SerializableCallable() {
@Override
public Object call() throws Exception {
// Make sure the type registry is persistent
CacheFactory cf = new CacheFactory();
cf.setPdxPersistent(true);
cf.setPdxDiskStore("store1");
Cache cache = getCache(cf);
cache.createDiskStoreFactory().setMaxOplogSize(1).setDiskDirs(getDiskDirs())
.create("store1");
AttributesFactory af = new AttributesFactory();
af.setScope(Scope.DISTRIBUTED_ACK);
af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
af.setDiskStoreName("store1");
createRootRegion(TEST_REGION_NAME, af.create());
return null;
}
};
persistenceTest(createRegion);
}
private void persistenceTest(SerializableCallable createRegion) throws Throwable {
Host host = Host.getHost(0);
VM vm1 = host.getVM(0);
VM vm2 = host.getVM(1);
VM vm3 = host.getVM(2);
vm1.invoke(createRegion);
vm2.invoke(createRegion);
vm1.invoke(() -> doSimplePut(false));
final SerializableCallable checkForObject = new SerializableCallable() {
@Override
public Object call() throws Exception {
Region r = getRootRegion(TEST_REGION_NAME);
assertEquals(ANOBJECT, r.get(1));
// Ok, now the type registry should exist
assertNotNull(getRootRegion(PeerTypeRegistration.REGION_NAME));
return null;
}
};
vm2.invoke(checkForObject);
SerializableCallable closeCache = new SerializableCallable() {
@Override
public Object call() throws Exception {
closeCache();
return null;
}
};
// Close the cache in both VMs
vm1.invoke(closeCache);
vm2.invoke(closeCache);
// Now recreate the region, recoverying from disk
AsyncInvocation future1 = vm1.invokeAsync(createRegion);
AsyncInvocation future2 = vm2.invokeAsync(createRegion);
future1.getResult();
future2.getResult();
// Make sure we can still find and deserialize the result.
vm1.invoke(checkForObject);
vm2.invoke(checkForObject);
// Make sure a late comer can still create the type registry.
vm3.invoke(createRegion);
vm3.invoke(checkForObject);
}
@Test
public void testVmWaitsForPdxType() throws Throwable {
VM vm0 = Host.getHost(0).getVM(0);
VM vm1 = Host.getHost(0).getVM(1);
getBlackboard().initBlackboard();
final Properties properties = getDistributedSystemProperties();
properties.put("conserve-sockets", "false");
// steps:
// 1 create two caches and define a PdxType
// 2 install a block in VM1 that delays receipt of new PDX types
// 3 update the value of the PdxInstance in VM0 using a new Enum type
// 4 get the value in VM0
// The result should be that step 4 hangs unless the bug is fixed
vm0.invoke("create cache", () -> {
Cache cache = getCache(properties);
Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create("testRegion");
region.put("TestObject", new TestPdxObject("aString", 1, 1.0, TestPdxObject.AnEnum.ONE));
});
vm1.invoke("create cache and region", () -> {
Cache cache = getCache(properties);
// note that initial image transfer in testRegion will cause the object to be serialized in
// vm0
// and populate the PdxRegion in this vm
cache.createRegionFactory(RegionShortcut.REPLICATE).create("testRegion");
// this message observer will ensure that a new PDX registration doesn't occur
final DUnitBlackboard bb = getBlackboard();
DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
@Override
public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage msg) {
if (msg instanceof DistributedCacheOperation.CacheOperationMessage) {
try {
DistributedCacheOperation.CacheOperationMessage cmsg =
(DistributedCacheOperation.CacheOperationMessage) msg;
String path = cmsg.getRegionPath();
if (path.equals(PeerTypeRegistration.REGION_FULL_PATH)) {
System.out
.println("message observer found a PDX update message and is stalling: " + msg);
try {
bb.signalGate("listenerWasInvoked");
bb.waitForGate("pdxObjectGetStarting", 3, TimeUnit.SECONDS);
// let the get() commence and block
Thread.sleep(30000);
System.out.println("message observer after sleep ");
} catch (InterruptedException e) {
System.out.println("message observer is done stalling1 ");
bb.setMailbox("listenerProblem", e);
} catch (TimeoutException e) {
System.out.println("message observer is done stalling2");
bb.setMailbox("listenerProblem", e);
} finally {
System.out.println("message observer is done stalling");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
});
AsyncInvocation async0 = vm0.invokeAsync("propagate value with new pdx enum type", () -> {
Cache cache = getCache(properties);
final Region pdxRegion = cache.getRegion(PeerTypeRegistration.REGION_FULL_PATH);
final DUnitBlackboard bb = getBlackboard();
// now we register a new Id for our enum in a different thread. This will
// block in vm1 due to its message observer
Thread t = new Thread("PdxSerializableDUnitTest async thread") {
@Override
public void run() {
bb.signalGate("asyncThreadReady");
// pdxRegion.put(new EnumId(0x3010101), new EnumInfo(TestPdxObject.AnEnum.TWO));
((GemFireCacheImpl) cache).getPdxRegistry().addRemoteEnum(0x3010101,
new EnumInfo(TestPdxObject.AnEnum.TWO));
}
};
t.setDaemon(true);
t.start();
try {
bb.waitForGate("asyncThreadReady", 20, TimeUnit.SECONDS);
} catch (TimeoutException e) {
fail("timed out");
}
// reserialization will use the new Enumeration PDX type
Region region = cache.getRegion("testRegion");
bb.waitForGate("listenerWasInvoked", 20, TimeUnit.SECONDS);
region.put("TestObject", new TestPdxObject("TestObject'", 2, 2.0, TestPdxObject.AnEnum.TWO));
System.out.println("TestObject added put");
bb.signalGate("pdxObjectPut");
});
// vm0 has sent a new TestObject but vm1 does not have the enum type needed to
// deserialize it.
AsyncInvocation async1 = vm1.invokeAsync("try to read object w/o enum type", () -> {
DUnitBlackboard bb = getBlackboard();
bb.waitForGate("pdxObjectPut", 10, TimeUnit.SECONDS);
Region region = getCache(properties).getRegion("testRegion");
bb.signalGate("pdxObjectGetStarting");
Object testObject = region.get("TestObject");
System.out.println("found " + testObject);
});
DUnitBlackboard bb = getBlackboard();
try {
async0.join(20000);
async1.join(10000);
if (async0.exceptionOccurred()) {
throw async0.getException();
}
if (async1.exceptionOccurred()) {
throw async1.getException();
}
assertTrue(bb.isGateSignaled("listenerWasInvoked"));
/*
* Throwable throwable = (Throwable)bb.getMailbox("listenerProblem"); if (throwable != null) {
* RuntimeException rte = new RuntimeException("message observer had a problem", throwable);
* throw rte; }
*/
} finally {
bb.signalGate("pdxObjectGetStarting");
bb.signalGate("pdxObjectPut");
bb.initBlackboard();
}
}
public static class TestPdxObject implements org.apache.geode.pdx.PdxSerializable {
public String stringVar;
public int intVar;
public double floatVar;
public static enum AnEnum {
ONE, TWO
}
;
public AnEnum enumVar;
public TestPdxObject() {}
public TestPdxObject(String stringVar, int intVar, double floatVar, AnEnum enumVar) {
this.stringVar = stringVar;
this.intVar = intVar;
this.floatVar = floatVar;
this.enumVar = enumVar;
}
@Override
public void toData(final PdxWriter writer) {
writer.writeString("stringVar", stringVar).writeInt("intVar", intVar)
.writeDouble("floatVar", floatVar).writeObject("enumVar", this.enumVar);
}
@Override
public void fromData(final PdxReader reader) {
stringVar = reader.readString("stringVar");
intVar = reader.readInt("intVar");
floatVar = reader.readDouble("floatVar");
enumVar = (AnEnum) reader.readObject("enumVar");
}
@Override
public String toString() {
return "TestPdxObject [stringVar=" + stringVar + ", intVar=" + intVar + ", floatVar="
+ floatVar + ", enumVar=" + enumVar + "]";
}
}
/**
* add a listener and writer that will throw an exception when invoked if events are for internal
* regions
*/
public void addPoisonedTransactionListeners() {
MyTestTransactionListener listener = new MyTestTransactionListener();
getCache().getCacheTransactionManager().addListener(listener);
getCache().getCacheTransactionManager().setWriter(listener);
}
private void doSimplePut(boolean useTransaction) {
// Check to make sure the type region is not yet created
Region r = getRootRegion(TEST_REGION_NAME);
if (useTransaction) {
getCache().getCacheTransactionManager().begin();
}
try {
r.put(1, ANOBJECT);
} finally {
if (useTransaction) {
getCache().getCacheTransactionManager().commit();
}
}
// Ok, now the type registry should exist
assertNotNull(getRootRegion(PeerTypeRegistration.REGION_NAME));
}
private void verifySimplePut() {
// Ok, now the type registry should exist
assertNotNull(getRootRegion(PeerTypeRegistration.REGION_NAME));
Region r = getRootRegion(TEST_REGION_NAME);
assertEquals(ANOBJECT, r.get(1));
}
private static class MyTestTransactionListener implements TransactionWriter, TransactionListener {
private MyTestTransactionListener() {
}
private void checkEvent(TransactionEvent event) {
List<CacheEvent<?, ?>> events = event.getEvents();
System.out.println("MyTestTransactionListener.checkEvent: events are " + events);
for (CacheEvent<?, ?> cacheEvent : events) {
if (((LocalRegion) cacheEvent.getRegion()).isPdxTypesRegion()) {
throw new UnsupportedOperationException("found internal event: " + cacheEvent + " region="
+ cacheEvent.getRegion().getName());
}
}
}
@Override
public void beforeCommit(TransactionEvent event) throws TransactionWriterException {
checkEvent(event);
}
@Override
public void close() {}
@Override
public void afterCommit(TransactionEvent event) {
checkEvent(event);
}
@Override
public void afterFailedCommit(TransactionEvent event) {
checkEvent(event);
}
@Override
public void afterRollback(TransactionEvent event) {
checkEvent(event);
}
}
}