blob: 10b3fe783c69c950f65957320040c8f1a4465791 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache30;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheLoader;
import org.apache.geode.cache.CacheLoaderException;
import org.apache.geode.cache.CacheWriter;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.LoaderHelper;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionEvent;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.TimeoutException;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
/**
* This class tests various search load and write scenarios for distributed regions
*/
@SuppressWarnings({"deprecation", "unchecked", "rawtypes", "serial"})
public class SearchAndLoadDUnitTest extends JUnit4CacheTestCase {
static boolean loaderInvoked;
static boolean remoteLoaderInvoked;
static int remoteLoaderInvokedCount;
static boolean netSearchCalled;
static boolean netSearchHit;
static boolean netWriteInvoked;
static boolean operationWasCreate;
static boolean originWasRemote;
static int writerInvocationCount;
/** A <code>CacheListener</code> used by a test */
protected static TestCacheListener listener;
/** A <code>CacheLoader</code> used by a test */
protected static TestCacheLoader loader;
/** A <code>CacheWriter</code> used by a test */
protected static TestCacheWriter writer;
static boolean exceptionThrown;
static final CountDownLatch readyForExceptionLatch = new CountDownLatch(1);
static final CountDownLatch loaderInvokedLatch = new CountDownLatch(1);
@Override
public final void preTearDownCacheTestCase() throws Exception {
for (int h = 0; h < Host.getHostCount(); h++) {
Host host = Host.getHost(h);
for (int v = 0; v < host.getVMCount(); v++) {
host.getVM(v).invoke(new SerializableRunnable("Clean up") {
@Override
public void run() {
cleanup();
}
});
}
}
cleanup();
}
/**
* Clears fields used by a test
*/
protected static void cleanup() {
listener = null;
loader = null;
writer = null;
}
/**
* Returns region attributes for a <code>GLOBAL</code> region
*/
protected RegionAttributes getRegionAttributes() {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setEarlyAck(false);
return factory.create();
}
@Test
public void testNetSearch() throws CacheException, InterruptedException {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final String name = this.getUniqueName() + "-ACK";
final String objectName = "NetSearchKey";
final Integer value = new Integer(440);
vm0.invoke(new SerializableRunnable("Create ACK Region") {
@Override
public void run() {
try {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setEarlyAck(false);
factory.setStatisticsEnabled(true);
Region region = createRegion(name, factory.create());
region.create(objectName, null);
} catch (CacheException ex) {
Assert.fail("While creating ACK region", ex);
}
}
});
vm1.invoke(new SerializableRunnable("Create ACK Region") {
@Override
public void run() {
try {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setEarlyAck(false);
factory.setStatisticsEnabled(true);
Region region = createRegion(name, factory.create());
region.put(objectName, value);
} catch (CacheException ex) {
Assert.fail("While creating ACK region", ex);
}
}
});
vm2.invoke(new SerializableRunnable("Create ACK Region") {
@Override
public void run() {
try {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setEarlyAck(false);
factory.setStatisticsEnabled(true);
Region region = createRegion(name, factory.create());
region.create(objectName, null);
} catch (CacheException ex) {
Assert.fail("While creating ACK region", ex);
}
}
});
vm0.invoke(new SerializableRunnable("Get a value") {
@Override
public void run() {
try {
Object result = null;
result = getRootRegion().getSubregion(name).get(objectName);
assertEquals(value, result);
// System.err.println("Results is " + result.toString() + " Key is " +
// objectName.toString());
} catch (CacheLoaderException cle) {
Assert.fail("While Get a value", cle);
} catch (TimeoutException te) {
Assert.fail("While Get a value", te);
}
}
});
}
/**
* This test is for a bug in which a cache loader threw an exception that caused the wrong value
* to be put in a Future in nonTxnFindObject. This in turn caused a concurrent search for the
* object to not invoke the loader a second time.
*
* VM0 is used to create a cache and a region having a loader that simulates the conditions that
* caused the bug. One async thread then does a get() which invokes the loader. Another async
* thread does a get() which reaches nonTxnFindObject and blocks waiting for the first thread's
* load to complete. The loader then throws an exception that is sent back to the first thread.
* The second thread should then cause the loader to be invoked again, and this time the loader
* will return a value. Both threads then validate that they received the expected result.
*/
@Test
public void testConcurrentLoad() throws Throwable {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
final String name = this.getUniqueName() + "Region";
final String objectName = "theKey";
final Integer value = new Integer(44);
final String exceptionString = "causing first cache-load to fail";
remoteLoaderInvoked = false;
loaderInvoked = false;
vm0.invoke(new CacheSerializableRunnable("create region " + name + " in vm0") {
@Override
public void run2() {
remoteLoaderInvoked = false;
loaderInvoked = false;
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setConcurrencyChecksEnabled(true);
factory.setCacheLoader(new CacheLoader() {
boolean firstInvocation = true;
@Override
public synchronized Object load(LoaderHelper helper) {
System.out.println("invoked cache loader for " + helper.getKey());
loaderInvoked = true;
loaderInvokedLatch.countDown();
if (firstInvocation) {
firstInvocation = false;
try {
// wait for both threads to be ready for the exception to be thrown
System.out.println("waiting for vm0t2 to be ready before throwing exception");
readyForExceptionLatch.await(30, TimeUnit.SECONDS);
// give the second thread time to get into loader code
Thread.sleep(5000);
} catch (InterruptedException e) {
fail("interrupted");
}
System.out.println("throwing exception");
exceptionThrown = true;
throw new RuntimeException(exceptionString);
}
System.out.println("returning value=" + value);
return value;
}
@Override
public void close() {
}
});
Region region = createRegion(name, factory.create());
region.create(objectName, null);
IgnoredException.addIgnoredException(exceptionString);
}
});
AsyncInvocation async1 = null;
try {
async1 = vm0.invokeAsync(new CacheSerializableRunnable(
"Concurrently invoke the remote loader on the same key - t1") {
@Override
public void run2() {
Region region = getCache().getRegion("root/" + name);
LogWriterUtils.getLogWriter().info("t1 is invoking get(" + objectName + ")");
try {
LogWriterUtils.getLogWriter().info("t1 retrieved value " + region.get(objectName));
fail("first load should have triggered an exception");
} catch (RuntimeException e) {
if (!e.getMessage().contains(exceptionString)) {
throw e;
}
}
}
});
vm0.invoke(
new CacheSerializableRunnable("Concurrently invoke the loader on the same key - t2") {
@Override
public void run2() {
final Region region = getCache().getRegion("root/" + name);
final Object[] valueHolder = new Object[1];
// wait for vm1 to cause the loader to be invoked
LogWriterUtils.getLogWriter().info("t2 is waiting for loader to be invoked by t1");
try {
loaderInvokedLatch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
fail("interrupted");
}
assertTrue(loaderInvoked);
Thread t = new Thread("invoke get()") {
@Override
public void run() {
try {
valueHolder[0] = region.get(objectName);
} catch (RuntimeException e) {
valueHolder[0] = e;
}
}
};
t.setDaemon(true);
t.start();
try {
// let the thread get to the point of blocking on vm1's Future
// in LocalRegion.nonTxnFindObject()
Thread.sleep(5000);
} catch (InterruptedException e) {
fail("interrupted");
}
readyForExceptionLatch.countDown();
try {
t.join(30000);
} catch (InterruptedException e) {
fail("interrupted");
}
if (t.isAlive()) {
t.interrupt();
fail("get() operation blocked for too long - test needs some work");
}
LogWriterUtils.getLogWriter().info("t2 is invoking get(" + objectName + ")");
Object value = valueHolder[0];
if (value instanceof RuntimeException) {
if (((Exception) value).getMessage().contains(exceptionString)) {
fail("second load should not have thrown an exception");
} else {
throw (RuntimeException) value;
}
} else {
LogWriterUtils.getLogWriter().info("t2 retrieved value " + value);
assertNotNull(value);
}
}
});
} finally {
if (async1 != null) {
async1.join();
if (async1.exceptionOccurred()) {
throw async1.getException();
}
}
}
}
@Test
public void testNetLoadNoLoaders() throws CacheException, InterruptedException {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
final String name = this.getUniqueName() + "-ACK";
final String objectName = "B";
SerializableRunnable create = new CacheSerializableRunnable("Create Region") {
@Override
public void run2() throws CacheException {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setEarlyAck(false);
createRegion(name, factory.create());
}
};
vm0.invoke(create);
vm1.invoke(create);
vm0.invoke(new SerializableRunnable("Get with No Loaders defined") {
@Override
public void run() {
try {
Object result = getRootRegion().getSubregion(name).get(objectName);
assertNull(result);
} catch (CacheLoaderException cle) {
Assert.fail("While getting value for ACK region", cle);
} catch (TimeoutException te) {
Assert.fail("While getting value for ACK region", te);
}
}
});
}
@Test
public void testNetLoad() throws CacheException, InterruptedException {
disconnectAllFromDS();
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
final String name = this.getUniqueName() + "-ACK";
final String objectName = "B";
final Integer value = new Integer(43);
loaderInvoked = false;
remoteLoaderInvoked = false;
vm0.invoke(new SerializableRunnable("Create ACK Region") {
@Override
public void run() {
try {
loaderInvoked = false;
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setEarlyAck(false);
// factory.setCacheLoader(new CacheLoader() {
// public Object load(LoaderHelper helper) {
/// loaderInvoked = true;
// return value;
// }
//
// public void close() {
//
// }
// });
Region region = createRegion(name, factory.create());
region.create(objectName, null);
} catch (CacheException ex) {
Assert.fail("While creating ACK region", ex);
}
}
});
vm1.invoke(new SerializableRunnable("Create ACK Region") {
@Override
public void run() {
try {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setEarlyAck(false);
factory.setCacheLoader(new CacheLoader() {
@Override
public Object load(LoaderHelper helper) {
remoteLoaderInvoked = true;
return value;
}
@Override
public void close() {
}
});
createRegion(name, factory.create());
} catch (CacheException ex) {
Assert.fail("While creating ACK region", ex);
}
}
});
vm0.invoke(new SerializableRunnable("Get a value from remote loader") {
@Override
public void run() {
for (int i = 0; i < 1; i++) {
try {
Object result = getRootRegion().getSubregion(name).get(objectName);
assertEquals(value, result);
assertEquals(new Boolean(loaderInvoked), Boolean.FALSE);
// getRootRegion().getSubregion(name).invalidate(objectName);
} catch (CacheLoaderException cle) {
Assert.fail("While getting value for ACK region", cle);
}
/*
* catch(EntryNotFoundException enfe) { fail("While getting value for ACK region", enfe);
*
* }
*/
catch (TimeoutException te) {
Assert.fail("While getting value for ACK region", te);
}
}
}
});
}
/**
* Confirm that a netLoad that returns null will NOT allow other netLoad methods to be called.
*/
@Test
public void testEmptyNetLoad() throws CacheException, InterruptedException {
disconnectAllFromDS();
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final String name = this.getUniqueName() + "-ACK";
final String objectName = "B";
loaderInvoked = false;
remoteLoaderInvoked = false;
remoteLoaderInvokedCount = 0;
vm0.invoke(new SerializableRunnable("Create ACK Region") {
@Override
public void run() {
loaderInvoked = false;
remoteLoaderInvoked = false;
remoteLoaderInvokedCount = 0;
try {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setEarlyAck(false);
// factory.setCacheLoader(new CacheLoader() {
// public Object load(LoaderHelper helper) {
/// loaderInvoked = true;
// return value;
// }
//
// public void close() {
//
// }
// });
Region region = createRegion(name, factory.create());
region.create(objectName, null);
} catch (CacheException ex) {
Assert.fail("While creating ACK region", ex);
}
}
});
SerializableRunnable installLoader = new SerializableRunnable("Create ACK Region") {
@Override
public void run() {
loaderInvoked = false;
remoteLoaderInvoked = false;
remoteLoaderInvokedCount = 0;
try {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setEarlyAck(false);
factory.setCacheLoader(new CacheLoader() {
@Override
public Object load(LoaderHelper helper) {
remoteLoaderInvoked = true;
remoteLoaderInvokedCount++;
return null;
}
@Override
public void close() {
}
});
createRegion(name, factory.create());
} catch (CacheException ex) {
Assert.fail("While creating ACK region", ex);
}
}
};
vm1.invoke(installLoader);
vm2.invoke(installLoader);
vm0.invoke(new SerializableRunnable("Get a value from remote loader") {
@Override
public void run() {
for (int i = 0; i < 1; i++) {
try {
Object result = getRootRegion().getSubregion(name).get(objectName);
assertEquals(null, result);
assertEquals(false, loaderInvoked);
// getRootRegion().getSubregion(name).invalidate(objectName);
} catch (CacheLoaderException cle) {
Assert.fail("While getting value for ACK region", cle);
}
/*
* catch(EntryNotFoundException enfe) { fail("While getting value for ACK region", enfe);
*
* }
*/
catch (TimeoutException te) {
Assert.fail("While getting value for ACK region", te);
}
}
}
});
// we only invoke one netLoad loader even when they return null.
boolean xor = vmRemoteLoaderInvoked(vm1) ^ vmRemoteLoaderInvoked(vm2);
assertEquals(
"vm1=" + vmRemoteLoaderInvoked(vm1) + " vm2=" + vmRemoteLoaderInvoked(vm2) + " vm1Count="
+ vmRemoteLoaderInvokedCount(vm1) + " vm2Count=" + vmRemoteLoaderInvokedCount(vm2),
true, xor);
int total = vmRemoteLoaderInvokedCount(vm1) + vmRemoteLoaderInvokedCount(vm2);
assertEquals(
"vm1=" + vmRemoteLoaderInvokedCount(vm1) + " vm2=" + vmRemoteLoaderInvokedCount(vm2), 1,
total);
}
public static boolean vmRemoteLoaderInvoked(VM vm) {
Boolean v = (Boolean) vm.invoke(() -> SearchAndLoadDUnitTest.fetchRemoteLoaderInvoked());
return v.booleanValue();
}
public static int vmRemoteLoaderInvokedCount(VM vm) {
Integer v = (Integer) vm.invoke(() -> SearchAndLoadDUnitTest.fetchRemoteLoaderInvokedCount());
return v.intValue();
}
public static Boolean fetchRemoteLoaderInvoked() {
return Boolean.valueOf(remoteLoaderInvoked);
}
public static Integer fetchRemoteLoaderInvokedCount() {
return new Integer(remoteLoaderInvokedCount);
}
@Test
public void testLocalLoad() throws CacheException, InterruptedException {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
final String name = this.getUniqueName() + "-ACK";
final String objectName = "C";
final Integer value = new Integer(44);
remoteLoaderInvoked = false;
loaderInvoked = false;
vm0.invoke(new SerializableRunnable("Create ACK Region") {
@Override
public void run() {
remoteLoaderInvoked = false;
loaderInvoked = false;
try {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setEarlyAck(false);
factory.setCacheLoader(new CacheLoader() {
@Override
public Object load(LoaderHelper helper) {
loaderInvoked = true;
return value;
}
@Override
public void close() {
}
});
Region region = createRegion(name, factory.create());
region.create(objectName, null);
} catch (CacheException ex) {
Assert.fail("While creating ACK region", ex);
}
}
});
vm1.invoke(new SerializableRunnable("Create ACK Region") {
@Override
public void run() {
remoteLoaderInvoked = false;
loaderInvoked = false;
try {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setEarlyAck(false);
factory.setCacheLoader(new CacheLoader() {
@Override
public Object load(LoaderHelper helper) {
remoteLoaderInvoked = true;
return value;
}
@Override
public void close() {
}
});
createRegion(name, factory.create());
} catch (CacheException ex) {
Assert.fail("While creating ACK region", ex);
}
}
});
vm0.invoke(new SerializableRunnable("Get a value from local loader") {
@Override
public void run() {
try {
Object result = getRootRegion().getSubregion(name).get(objectName);
assertEquals(value, result);
assertEquals(new Boolean(loaderInvoked), Boolean.TRUE);
assertEquals(new Boolean(remoteLoaderInvoked), Boolean.FALSE);
} catch (CacheLoaderException cle) {
} catch (TimeoutException te) {
}
}
});
}
@Test
public void testNetWrite() throws CacheException, InterruptedException {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
final String name = this.getUniqueName() + "-ACK";
final String objectName = "Gemfire7";
final Integer value = new Integer(483);
vm0.invoke(new SerializableRunnable("Create ACK Region with cacheWriter") {
@Override
public void run() {
netWriteInvoked = false;
try {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setCacheWriter(new CacheWriter() {
@Override
public void beforeCreate(EntryEvent e) throws CacheWriterException {
netWriteInvoked = true;
return;
}
@Override
public void beforeUpdate(EntryEvent e) throws CacheWriterException {
netWriteInvoked = true;
return;
}
@Override
public void beforeDestroy(EntryEvent e) throws CacheWriterException {
return;
}
@Override
public void beforeRegionDestroy(RegionEvent e) throws CacheWriterException {
return;
}
@Override
public void beforeRegionClear(RegionEvent e) throws CacheWriterException {
return;
}
@Override
public void close() {}
});
createRegion(name, factory.create());
} catch (CacheException ex) {
Assert.fail("While creating ACK region", ex);
}
}
});
vm1.invoke(new SerializableRunnable("Create ACK Region") {
@Override
public void run() {
loaderInvoked = false;
remoteLoaderInvoked = false;
netWriteInvoked = false;
try {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
createRegion(name, factory.create());
} catch (CacheException ex) {
Assert.fail("While creating ACK region", ex);
}
}
});
vm1.invoke(new SerializableRunnable(
"Do a put operation resulting in cache writer notification in other vm") {
@Override
public void run() {
try {
getRootRegion().getSubregion(name).put(objectName, value);
try {
Object result = getRootRegion().getSubregion(name).get(objectName);
assertEquals(result, value);
} catch (CacheLoaderException cle) {
} catch (TimeoutException te) {
}
} catch (CacheWriterException cwe) {
} catch (TimeoutException te) {
}
}
});
vm0.invoke(new SerializableRunnable("ensure that cache writer was invoked") {
@Override
public void run() {
assertTrue("expected cache writer to be invoked", netWriteInvoked);
}
});
}
@Test
public void testOneHopNetWrite() throws CacheException, InterruptedException {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
final String name = this.getUniqueName() + "Region";
final String objectName = "Object7";
final Integer value = new Integer(483);
final Integer updateValue = new Integer(484);
vm0.invoke(new SerializableRunnable("Create replicated region with cacheWriter") {
@Override
public void run() {
netWriteInvoked = false;
operationWasCreate = false;
originWasRemote = false;
writerInvocationCount = 0;
try {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
factory.setCacheWriter(new CacheWriter() {
@Override
public void beforeCreate(EntryEvent e) throws CacheWriterException {
e.getRegion().getCache().getLogger()
.info("cache writer beforeCreate invoked for " + e);
netWriteInvoked = true;
operationWasCreate = true;
originWasRemote = e.isOriginRemote();
writerInvocationCount++;
return;
}
@Override
public void beforeUpdate(EntryEvent e) throws CacheWriterException {
e.getRegion().getCache().getLogger()
.info("cache writer beforeUpdate invoked for " + e);
netWriteInvoked = true;
operationWasCreate = false;
originWasRemote = e.isOriginRemote();
writerInvocationCount++;
return;
}
@Override
public void beforeDestroy(EntryEvent e) throws CacheWriterException {}
@Override
public void beforeRegionDestroy(RegionEvent e) throws CacheWriterException {}
@Override
public void beforeRegionClear(RegionEvent e) throws CacheWriterException {}
@Override
public void close() {}
});
createRegion(name, factory.create());
} catch (CacheException ex) {
Assert.fail("While creating replicated region", ex);
}
}
});
vm1.invoke(new SerializableRunnable("Create empty Region") {
@Override
public void run() {
try {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.EMPTY);
createRegion(name, factory.create());
} catch (CacheException ex) {
Assert.fail("While creating empty region", ex);
}
}
});
vm1.invoke(new SerializableRunnable(
"do a put that should be proxied in the other vm and invoke its cache writer") {
@Override
public void run() {
try {
getRootRegion().getSubregion(name).put(objectName, value);
} catch (CacheWriterException cwe) {
} catch (TimeoutException te) {
}
}
});
vm0.invoke(new SerializableRunnable(
"ensure that cache writer was invoked with correct settings in event") {
@Override
public void run() {
assertTrue("expected cache writer to be invoked", netWriteInvoked);
assertTrue("expected originRemote to be true", originWasRemote);
assertTrue("expected event to be create", operationWasCreate);
assertEquals("expected only one cache writer invocation", 1, writerInvocationCount);
// set flags for the next test - updating the same key
netWriteInvoked = false;
writerInvocationCount = 0;
}
});
vm1.invoke(new SerializableRunnable(
"do an update that should be proxied in the other vm and invoke its cache writer") {
@Override
public void run() {
try {
getRootRegion().getSubregion(name).put(objectName, updateValue);
} catch (CacheWriterException cwe) {
} catch (TimeoutException te) {
}
}
});
vm0.invoke(new SerializableRunnable(
"ensure that cache writer was invoked with correct settings in event") {
@Override
public void run() {
assertTrue("expected cache writer to be invoked", netWriteInvoked);
assertTrue("expected originRemote to be true", originWasRemote);
assertTrue("expected event to be create", operationWasCreate);
assertEquals("expected only one cache writer invocation", 1, writerInvocationCount);
}
});
}
/** same as the previous test but the cache writer is in a third, non-replicated, vm */
@Test
public void testOneHopNetWriteRemoteWriter() throws CacheException, InterruptedException {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
final String name = this.getUniqueName() + "Region";
final String objectName = "Object7";
final Integer value = new Integer(483);
final Integer updateValue = new Integer(484);
vm0.invoke(new SerializableRunnable("Create replicate Region") {
@Override
public void run() {
try {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
createRegion(name, factory.create());
} catch (CacheException ex) {
Assert.fail("While creating empty region", ex);
}
}
});
vm1.invoke(new SerializableRunnable("Create empty Region") {
@Override
public void run() {
try {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.EMPTY);
createRegion(name, factory.create());
} catch (CacheException ex) {
Assert.fail("While creating empty region", ex);
}
}
});
vm2.invoke(new SerializableRunnable("Create replicated region with cacheWriter") {
@Override
public void run() {
netWriteInvoked = false;
operationWasCreate = false;
originWasRemote = false;
writerInvocationCount = 0;
try {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.EMPTY);
factory.setCacheWriter(new CacheWriter() {
@Override
public void beforeCreate(EntryEvent e) throws CacheWriterException {
e.getRegion().getCache().getLogger()
.info("cache writer beforeCreate invoked for " + e);
netWriteInvoked = true;
operationWasCreate = true;
originWasRemote = e.isOriginRemote();
writerInvocationCount++;
return;
}
@Override
public void beforeUpdate(EntryEvent e) throws CacheWriterException {
e.getRegion().getCache().getLogger()
.info("cache writer beforeUpdate invoked for " + e);
netWriteInvoked = true;
operationWasCreate = false;
originWasRemote = e.isOriginRemote();
writerInvocationCount++;
return;
}
@Override
public void beforeDestroy(EntryEvent e) throws CacheWriterException {}
@Override
public void beforeRegionDestroy(RegionEvent e) throws CacheWriterException {}
@Override
public void beforeRegionClear(RegionEvent e) throws CacheWriterException {}
@Override
public void close() {}
});
createRegion(name, factory.create());
} catch (CacheException ex) {
Assert.fail("While creating replicated region", ex);
}
}
});
vm1.invoke(new SerializableRunnable(
"do a put that should be proxied in the other vm and invoke its cache writer") {
@Override
public void run() {
try {
getRootRegion().getSubregion(name).put(objectName, value);
} catch (CacheWriterException cwe) {
} catch (TimeoutException te) {
}
}
});
vm2.invoke(new SerializableRunnable(
"ensure that cache writer was invoked with correct settings in event") {
@Override
public void run() {
assertTrue("expected cache writer to be invoked", netWriteInvoked);
assertTrue("expected originRemote to be true", originWasRemote);
assertTrue("expected event to be create", operationWasCreate);
assertEquals("expected only one cache writer invocation", 1, writerInvocationCount);
// set flags for the next test - updating the same key
netWriteInvoked = false;
writerInvocationCount = 0;
}
});
vm1.invoke(new SerializableRunnable(
"do an update that should be proxied in the other vm and invoke its cache writer") {
@Override
public void run() {
try {
getRootRegion().getSubregion(name).put(objectName, updateValue);
} catch (CacheWriterException cwe) {
} catch (TimeoutException te) {
}
}
});
vm2.invoke(new SerializableRunnable(
"ensure that cache writer was invoked with correct settings in event") {
@Override
public void run() {
assertTrue("expected cache writer to be invoked", netWriteInvoked);
assertTrue("expected originRemote to be true", originWasRemote);
assertTrue("expected event to be create", operationWasCreate);
assertEquals("expected only one cache writer invocation", 1, writerInvocationCount);
}
});
}
}