blob: 12c14d2240e48c2ca9c0ae8b3996da141561a505 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
/*
* ClearRvvLockingDUnitTest.java
*
* Created on September 6, 2005, 2:57 PM
*/
package org.apache.geode.internal.cache;
import static org.apache.geode.test.dunit.Assert.fail;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.logging.log4j.Logger;
import org.assertj.core.api.JUnitSoftAssertions;
import org.junit.Rule;
import org.junit.Test;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheEvent;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.Scope;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.SerializableRunnableIF;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
/**
* Test class to verify proper RVV locking interaction between entry operations such as PUT/REMOVE
* and the CLEAR region operation
*
* GEODE-599: After an operation completed, it would unlock the RVV. This was occurring before the
* operation was distributed to other members which created a window in which another operation
* could be performed prior to that operation being distributed.
*
* The fix for GEODE-599 was to not release the lock until after distributing the operation to the
* other members.
*
*/
@SuppressWarnings("serial")
public class ClearRvvLockingDUnitTest extends JUnit4CacheTestCase {
@Rule
public transient JUnitSoftAssertions softly = new JUnitSoftAssertions();
/*
* The tests perform a single operation and a concurrent clear.
*
* opsVM determines where the single operation will be performed, null will perform the op on the
* test VM (vm0) clearVM determines where the clear operation will be performed, null will perform
* the clear on the test VM (vm0)
*
* Specifying NULL/NULL for opsVM and clearVM has the effect of performing both in the same thread
* whereas specifying vm0/vm0 for example will run them both on the same vm, but different
* threads. NULL/NULL is not tested here since the same thread performing a clear prior to
* returning from a put is not possible using the public API.
*
* Each test is performed twice once with operation and clear on the same vm, once on different
* vms.
*
*/
VM vm0, vm1, opsVM, clearVM;
static Cache cache;
static LocalRegion region;
DistributedMember vm0ID, vm1ID;
static AbstractRegionMap.ARMLockTestHook theHook;
static final String THE_KEY = "theKey";
static final String THE_VALUE = "theValue";
private static final Logger logger = LogService.getLogger();
// test methods
@Test
public void testPutOperationSameVM() {
try {
setupMembers();
setOpAndClearVM(vm0, vm0); // first arg is where to perform operation, second arg where to
// perform clear
opsVM.invoke(() -> setBasicHook(opsVM));
runConsistencyTest(vm0, performPutOperation);
checkForConsistencyErrors();
} finally {
opsVM.invoke(() -> resetHook());
}
}
@Test
public void testPutOperationDifferentVM() {
try {
setupMembers();
setOpAndClearVM(vm0, vm1); // first arg is where to perform operation, second arg where to
// perform clear
opsVM.invoke(() -> setBasicHook(clearVM));
runConsistencyTest(vm0, performPutOperation);
checkForConsistencyErrors();
} finally {
opsVM.invoke(() -> resetHook());
}
}
@Test
public void testPutOperationNoAck() {
try {
setupNoAckMembers();
setOpAndClearVM(vm0, vm0);
vm0.invoke(() -> setLocalNoAckHook(vm1));
vm1.invoke(() -> setRemoteNoAckHook(vm0));
vm0.invoke(() -> primeStep1(1));
vm1.invoke(() -> primeStep2(1));
runConsistencyTest(vm0, performNoAckPutOperation);
checkForConsistencyErrors();
} finally {
vm0.invoke(() -> resetHook());
vm1.invoke(() -> resetHook());
}
}
@Test
public void testRemoveOperationSameVM() {
try {
setupMembers();
setOpAndClearVM(vm0, vm0);
opsVM.invoke(() -> setRemoveAndInvalidateHook(clearVM));
runConsistencyTest(vm0, performRemoveOperation);
checkForConsistencyErrors();
} finally {
opsVM.invoke(() -> resetHook());
}
}
@Test
public void testRemoveOperationDifferentVM() {
try {
setupMembers();
setOpAndClearVM(vm0, vm1);
opsVM.invoke(() -> setRemoveAndInvalidateHook(clearVM));
runConsistencyTest(vm0, performRemoveOperation);
checkForConsistencyErrors();
} finally {
opsVM.invoke(() -> resetHook());
}
}
@Test
public void testInvalidateOperationSameVM() {
try {
setupMembers();
setOpAndClearVM(vm0, vm0);
opsVM.invoke(() -> setRemoveAndInvalidateHook(clearVM));
runConsistencyTest(vm0, performInvalidateOperation);
checkForConsistencyErrors();
} finally {
opsVM.invoke(() -> resetHook());
}
}
@Test
public void testInvalidateOperationDifferentVM() {
try {
setupMembers();
setOpAndClearVM(vm0, vm1);
opsVM.invoke(() -> setRemoveAndInvalidateHook(clearVM));
runConsistencyTest(vm0, performInvalidateOperation);
checkForConsistencyErrors();
} finally {
opsVM.invoke(() -> resetHook());
}
}
@Test
public void testPutAllOperationSameVM() {
try {
setupMembers();
setOpAndClearVM(vm0, vm0);
opsVM.invoke(() -> setBulkHook(vm0));
runConsistencyTest(vm0, performPutAllOperation);
checkForConsistencyErrors();
} finally {
opsVM.invoke(() -> resetHook());
}
}
@Test
public void testPutAllOperationDifferentVM() {
try {
setupMembers();
setOpAndClearVM(vm0, vm1);
opsVM.invoke(() -> setBulkHook(vm0));
runConsistencyTest(vm0, performPutAllOperation);
checkForConsistencyErrors();
} finally {
opsVM.invoke(() -> resetHook());
}
}
@Test
public void testRemoveAllOperationSameVM() {
try {
setupMembers();
setOpAndClearVM(vm0, vm0);
opsVM.invoke(() -> setBulkHook(vm0));
runConsistencyTest(vm0, performRemoveAllOperation);
checkForConsistencyErrors();
} finally {
opsVM.invoke(() -> resetHook());
}
}
@Test
public void testRemoveAllOperationDifferentVM() {
try {
setupMembers();
setOpAndClearVM(vm0, vm1);
opsVM.invoke(() -> setBulkHook(vm0));
runConsistencyTest(vm0, performRemoveAllOperation);
checkForConsistencyErrors();
} finally {
opsVM.invoke(() -> resetHook());
}
}
private void invokePut(VM whichVM) {
if (whichVM == null) {
doPut();
} else {
whichVM.invoke(() -> doPut());
}
}
private void invokeRemove(VM whichVM) {
if (whichVM == null) {
doRemove();
} else {
whichVM.invoke(() -> doRemove());
}
}
private void invokeInvalidate(VM whichVM) {
if (whichVM == null) {
doInvalidate();
} else {
whichVM.invoke(() -> doInvalidate());
}
}
private void invokePutAll(VM whichVM) {
if (whichVM == null) {
doPutAll();
} else {
whichVM.invoke(() -> doPutAll());
}
}
private void invokeRemoveAll(VM whichVM) {
if (whichVM == null) {
doRemoveAll();
} else {
whichVM.invoke(() -> doRemoveAll());
}
}
private static void invokeClear(VM whichVM) {
if (whichVM == null) {
doClear();
} else {
whichVM.invoke(() -> doClear());
}
}
// remote test methods
private static boolean doesRegionEntryExist(String key) {
return region.getRegionEntry(key) != null;
}
private static void doPut() {
region.put(THE_KEY, THE_VALUE);
}
private static void doRemove() {
region.remove(THE_KEY);
}
private static void doInvalidate() {
region.invalidate(THE_KEY);
}
private static void doPutAll() {
Map<Object, Object> map = generateKeyValues();
region.putAll(map, "putAllCallback");
}
private static void doRemoveAll() {
Map<Object, Object> map = generateKeyValues();
region.removeAll(map.keySet(), "removeAllCallback");
}
private static void doClear() {
region.clear();
}
private static void primeStep1(int cnt) {
primeStep1Latch(cnt);
}
private static void primeStep2(int cnt) {
primeStep2Latch(cnt);
}
private static void releaseStep1() {
decrementStep1Latch();
}
SerializableRunnable performPutOperation = new SerializableRunnable("perform PUT") {
@Override
public void run() {
try {
invokePut(opsVM);
} catch (Exception e) {
fail("while performing PUT", e);
}
}
};
SerializableRunnable performNoAckPutOperation = new SerializableRunnable("perform NoAckPUT") {
@Override
public void run() throws InterruptedException {
Runnable putThread1 = new Runnable() {
@Override
public void run() {
DistributedSystem.setThreadsSocketPolicy(false);
doPut();
DistributedSystem.releaseThreadsSockets();
}
};
Runnable putThread2 = new Runnable() {
@Override
public void run() {
DistributedSystem.setThreadsSocketPolicy(false);
awaitStep1Latch();
doClear();
DistributedSystem.releaseThreadsSockets();
}
};
Thread t1 = new Thread(putThread1);
Thread t2 = new Thread(putThread2);
t2.start();
t1.start();
t1.join();
t2.join();
}
};
SerializableRunnable performRemoveOperation = new SerializableRunnable("perform REMOVE") {
@Override
public void run() {
try {
invokePut(opsVM);
invokeRemove(opsVM);
} catch (Exception e) {
fail("while performing REMOVE", e);
}
}
};
SerializableRunnable performInvalidateOperation = new SerializableRunnable("perform INVALIDATE") {
@Override
public void run() {
try {
invokePut(opsVM);
invokeInvalidate(opsVM);
} catch (Exception e) {
fail("while performing INVALIDATE", e);
}
}
};
SerializableRunnable performPutAllOperation = new SerializableRunnable("perform PUTALL") {
@Override
public void run() {
try {
invokePutAll(opsVM);
} catch (Exception e) {
fail("while performing PUTALL", e);
}
}
};
SerializableRunnable performRemoveAllOperation = new SerializableRunnable("perform REMOVEALL") {
@Override
public void run() {
try {
invokePutAll(opsVM);
invokeRemoveAll(opsVM);
} catch (Exception e) {
fail("while performing REMOVEALL", e);
}
}
};
// helper methods
private void setOpAndClearVM(VM opsTarget, VM clearTarget) {
opsVM = opsTarget;
clearVM = clearTarget;
}
private void setupMembers() {
Host host = Host.getHost(0);
vm0 = host.getVM(0);
vm1 = host.getVM(1);
vm0ID = createCache(vm0);
vm1ID = createCache(vm1);
String testName = getName();
vm0.invoke(() -> createRegion(testName));
vm1.invoke(() -> createRegion(testName));
}
private void setupNoAckMembers() {
Host host = Host.getHost(0);
vm0 = host.getVM(0);
vm1 = host.getVM(1);
vm0ID = createNoConserveSocketsCache(vm0);
vm1ID = createNoConserveSocketsCache(vm1);
String testName = getName();
vm0.invoke(() -> createNOACKRegion(testName));
vm1.invoke(() -> createNOACKRegion(testName));
}
private void runConsistencyTest(VM vm, SerializableRunnableIF theTest) {
vm.invoke(theTest);
}
private void checkForConsistencyErrors() {
Map<Object, Object> r0Contents = (Map<Object, Object>) vm0.invoke(() -> getRegionContents());
Map<Object, Object> r1Contents = (Map<Object, Object>) vm1.invoke(() -> getRegionContents());
String key = THE_KEY;
softly.assertThat(r1Contents.get(key)).as("region contents are not consistent for key %s", key)
.isEqualTo(r0Contents.get(key));
softly.assertThat(checkRegionEntry(vm1, key))
.as("region entries are not consistent for key %s", key)
.isEqualTo(checkRegionEntry(vm0, key));
for (int subi = 1; subi < 3; subi++) {
String subkey = key + "-" + subi;
if (r0Contents.containsKey(subkey)) {
softly.assertThat(r1Contents.get(subkey))
.as("region contents are not consistent for key %s", subkey)
.isEqualTo(r0Contents.get(subkey));
} else {
softly.assertThat(r1Contents).as("expected containsKey for %s to return false", subkey)
.doesNotContainKey(subkey);
}
}
}
public void resetHook() {
((AbstractRegionMap) region.entries).setARMLockTestHook(null);
}
public void setBasicHook(VM whichVM) {
theOtherVM = whichVM;
theHook = new ArmBasicClearHook();
((AbstractRegionMap) region.entries).setARMLockTestHook(theHook);
}
public void setRemoveAndInvalidateHook(VM whichVM) {
theOtherVM = whichVM;
theHook = new ArmRemoveAndInvalidateClearHook();
((AbstractRegionMap) region.entries).setARMLockTestHook(theHook);
}
public void setRemoteNoAckHook(VM whichVM) {
theOtherVM = whichVM;
theHook = new ArmNoAckRemoteHook();
((AbstractRegionMap) region.entries).setARMLockTestHook(theHook);
}
public void setLocalNoAckHook(VM whichVM) {
theOtherVM = whichVM;
theHook = new ArmNoAckLocalHook();
((AbstractRegionMap) region.entries).setARMLockTestHook(theHook);
}
public void setBulkHook(VM whichVM) {
theOtherVM = whichVM;
theHook = new ArmBulkClearHook();
((AbstractRegionMap) region.entries).setARMLockTestHook(theHook);
}
private InternalDistributedMember createCache(VM vm) {
return (InternalDistributedMember) vm.invoke(new SerializableCallable<Object>() {
@Override
public Object call() {
cache = getCache(new CacheFactory().set("conserve-sockets", "true"));
return getSystem().getDistributedMember();
}
});
}
private InternalDistributedMember createNoConserveSocketsCache(VM vm) {
return (InternalDistributedMember) vm.invoke(new SerializableCallable<Object>() {
@Override
public Object call() {
cache = getCache(new CacheFactory().set("conserve-sockets", "false"));
return getSystem().getDistributedMember();
}
});
}
private static void createRegion(String rgnName) {
RegionFactory<Object, Object> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
rf.setConcurrencyChecksEnabled(true);
rf.setScope(Scope.DISTRIBUTED_ACK);
region = (LocalRegion) rf.create(rgnName);
}
private static void createNOACKRegion(String rgnName) {
RegionFactory<Object, Object> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
rf.setConcurrencyChecksEnabled(true);
rf.setScope(Scope.DISTRIBUTED_NO_ACK);
region = (LocalRegion) rf.create(rgnName);
}
private static Map<Object, Object> generateKeyValues() {
String key = THE_KEY;
String value = THE_VALUE;
Map<Object, Object> map = new HashMap<>();
map.put(key, value);
map.put(key + "-1", value + "-1");
map.put(key + "-2", value + "-2");
return map;
}
@SuppressWarnings("rawtypes")
private static Map<Object, Object> getRegionContents() {
Map<Object, Object> result = new HashMap<>();
for (Iterator i = region.entrySet().iterator(); i.hasNext();) {
Region.Entry e = (Region.Entry) i.next();
result.put(e.getKey(), e.getValue());
}
return result;
}
private boolean checkRegionEntry(VM vm, String key) {
boolean target = vm.invoke(() -> doesRegionEntryExist(key));
return target;
}
static VM theOtherVM;
static transient CountDownLatch step1Latch, step2Latch;
public static void primeStep1Latch(int waitCount) {
step1Latch = new CountDownLatch(waitCount);
}
public static void awaitStep1Latch() {
try {
step1Latch.await();
} catch (InterruptedException e) {
}
}
public static void decrementStep1Latch() {
step1Latch.countDown();
}
public static void decrementRemoteStep1Latch() {
theOtherVM.invoke(() -> decrementStep1Latch());
}
public static void primeStep2Latch(int waitCount) {
step2Latch = new CountDownLatch(waitCount);
}
public static void awaitStep2Latch() {
try {
step2Latch.await();
} catch (InterruptedException e) {
}
}
public static void decrementStep2Latch() {
step2Latch.countDown();
}
public static void decrementRemoteStep2Latch() {
theOtherVM.invoke(() -> decrementStep2Latch());
}
/*
* Test callback class used to hook the rvv locking mechanism with basic operations.
*/
public static class ArmBasicClearHook extends ARMLockTestHookAdapter {
@Override
public void afterRelease(InternalRegion owner, CacheEvent event) {
if ((event.getOperation().isCreate()) && owner.getName().startsWith("test")) {
invokeClear(theOtherVM);
}
}
}
/*
* Test callback class used to hook the rvv locking mechanism with basic operations.
*/
public static class ArmRemoveAndInvalidateClearHook extends ARMLockTestHookAdapter {
@Override
public void afterRelease(InternalRegion owner, CacheEvent event) {
if ((event.getOperation().isDestroy() || event.getOperation().isInvalidate())
&& owner.getName().startsWith("test")) {
invokeClear(theOtherVM);
}
}
}
/*
* Test callback class used to hook the rvv locking mechanism for NOACK testing.
*/
public static class ArmNoAckRemoteHook extends ARMLockTestHookAdapter {
@Override
public void beforeLock(InternalRegion owner, CacheEvent event) {
if (event.isOriginRemote() && event.getOperation().isCreate()
&& owner.getName().startsWith("test")) {
theOtherVM.invoke(() -> releaseStep1()); // start clear
awaitStep2Latch(); // wait for clear to complete
}
}
}
public static class ArmNoAckLocalHook extends ARMLockTestHookAdapter {
@Override
public void beforeStateFlushWait() {
decrementRemoteStep2Latch();
}
}
/*
* Test callback class used to hook the rvv locking mechanism with bulk operations.
*/
public static class ArmBulkClearHook extends ARMLockTestHookAdapter {
@Override
public void afterBulkRelease(InternalRegion region) {
invokeClear(theOtherVM);
}
}
}