blob: 4c74e15ce71185eeffa23346106ff59654a8c859 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed;
import static java.lang.Boolean.TRUE;
import static java.lang.Long.MAX_VALUE;
import static java.lang.System.out;
import static java.lang.Thread.sleep;
import static org.apache.geode.distributed.DistributedLockService.getServiceNamed;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.ThreadUtils.dumpAllStacks;
import static org.apache.geode.test.dunit.VM.getVM;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.logging.log4j.Logger;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.SystemFailure;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.DistributionMessageObserver;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.locks.DLockGrantor;
import org.apache.geode.distributed.internal.locks.DLockRemoteToken;
import org.apache.geode.distributed.internal.locks.DLockRequestProcessor;
import org.apache.geode.distributed.internal.locks.DLockRequestProcessor.DLockRequestMessage;
import org.apache.geode.distributed.internal.locks.DLockRequestProcessor.DLockResponseMessage;
import org.apache.geode.distributed.internal.locks.DLockService;
import org.apache.geode.distributed.internal.locks.DLockToken;
import org.apache.geode.distributed.internal.locks.RemoteThread;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.util.StopWatch;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.SerializableRunnableIF;
import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.DLockTest;
/**
* This class tests distributed ownership via the DistributedLockService api.
*/
@Category({DLockTest.class})
public final class DistributedLockServiceDUnitTest extends JUnit4DistributedTestCase {
private static Logger logger = LogService.getLogger();
private static DistributedSystem dlstSystem;
private static DistributedLockBlackboard blackboard;
private static Object monitor = new Object();
private int hits = 0;
private int completes = 0;
private boolean done;
private boolean got;
/**
* Returns a previously created (or new, if this is the first time this method is called in this
* VM) distributed system which is somewhat configurable via hydra test parameters.
*/
@Override
public final void postSetUp() throws Exception {
createBlackboard();
Invoke.invokeInEveryVM(() -> createBlackboard());
// Create a DistributedSystem in every VM
connectDistributedSystem();
Invoke.invokeInEveryVM(() -> connectDistributedSystem());
}
private void createBlackboard() throws Exception {
if (blackboard == null) {
blackboard = DistributedLockBlackboardImpl.getInstance();
}
}
@Override
public final void preTearDown() {
Invoke.invokeInEveryVM(() -> destroyAllDLockServices());
}
@Override
public void postTearDown() {
disconnectAllFromDS();
}
public static void destroyAllDLockServices() {
DLockService.destroyAll();
dlstSystem = null;
}
/**
* Connects a DistributedSystem, saves it in static variable "system"
*/
protected static void connectDistributedSystem() {
dlstSystem = (new DistributedLockServiceDUnitTest()).getSystem();
}
private static volatile boolean stop_testFairness;
@Test
public void testFairness() throws InterruptedException, ExecutionException {
final int[] vmThreads = new int[] {1, 4, 8, 16};
final int numVms = vmThreads.length;
final int numThreads = Arrays.stream(vmThreads).sum();
final List<VM> vms = new ArrayList<>();
final List<Future> futures = new ArrayList<>();
final String serviceName = "testFairness_" + getUniqueName();
final Object lock = "lock";
// get the lock and hold it until all threads are ready to go
DistributedLockService service = DistributedLockService.create(serviceName, dlstSystem);
assertThat(service.lock(lock, -1, -1)).isTrue();
// create the lock service in all vms
vms.addAll(forNumVMsInvoke(numVms, () -> remoteCreateService(serviceName)));
Thread.sleep(100);
Invoke.invokeInEveryVM(() -> {
stop_testFairness = false;
});
// line up threads for the fairness race...
for (int vm = 0; vm < numVms; vm++) {
logger.info("[testFairness] lining up " + vmThreads[vm] + " threads in vm " + vm);
for (int j = 0; j < vmThreads[vm]; j++) {
SerializableCallable<Integer> fairnessRunnable = new SerializableCallable<Integer>() {
@Override
public Integer call() {
// lock, inc count, and unlock until stop_testFairness is set true
final AtomicInteger lockCount = new AtomicInteger(0);
try {
DistributedLockService service =
DistributedLockService.getServiceNamed(serviceName);
while (!stop_testFairness) {
assertThat(service.lock(lock, -1, -1)).isTrue();
lockCount.incrementAndGet();
service.unlock(lock);
}
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable t) {
logger.warn(t);
fail(t.getMessage());
} finally {
return lockCount.get();
}
}
};
futures.add(vms.get(vm).invokeAsync(() -> fairnessRunnable.call()));
}
}
// wait for all threads to be ready to start the race
for (int i = 0; i < numVms; i++) {
final int vmId = i;
vms.get(vmId).invoke(() -> {
DLockService localService =
(DLockService) DistributedLockService.getServiceNamed(serviceName);
await()
.untilAsserted(() -> assertThat(localService.getStats().getLockWaitsInProgress())
.isEqualTo(vmThreads[vmId]));
});
}
// start the race!
service.unlock(lock);
Thread.sleep(1000 * 5); // 5 seconds
// stop the race...
assertThat(service.lock(lock, -1, -1)).isTrue();
for (VM vm : vms) {
vm.invoke(new SerializableRunnable() {
@Override
public void run() {
stop_testFairness = true;
}
});
}
// release the lock and destroy the lock service
service.unlock(lock);
// calc total locks granted...
Integer totalLocks = 0;
Integer maxLocks = 0;
Integer minLocks = Integer.MAX_VALUE;
for (Future future : futures) {
Integer numLocks = (Integer) future.get();
totalLocks += numLocks;
if (minLocks > numLocks) {
minLocks = numLocks;
}
if (maxLocks < numLocks) {
maxLocks = numLocks;
}
}
vms.forEach(vm -> vm.invoke(() -> DistributedLockService.destroy(serviceName)));
logger.info("[testFairness] totalLocks=" + totalLocks + " minLocks="
+ minLocks + " maxLocks=" + maxLocks);
int expectedLocks = (totalLocks / numThreads) + 1;
int deviation = (int) (expectedLocks * 0.3);
int lowThreshold = expectedLocks - deviation;
int highThreshold = expectedLocks + deviation;
logger.info("[testFairness] deviation=" + deviation + " expectedLocks="
+ expectedLocks + " lowThreshold=" + lowThreshold + " highThreshold=" + highThreshold);
assertThat(minLocks >= lowThreshold).withFailMessage("minLocks is less than lowThreshold")
.isTrue();
assertThat(maxLocks <= highThreshold).withFailMessage("maxLocks is greater than highThreshold")
.isTrue();
}
@Test
public void testOneGetsAndOthersTimeOut() throws Exception {
doOneGetsAndOthersTimeOut(1, 1);
doOneGetsAndOthersTimeOut(4, 3);
}
private static InternalDistributedMember getLockGrantor(String serviceName) {
DLockService service = (DLockService) DistributedLockService.getServiceNamed(serviceName);
assertThat(service).isNotNull();
InternalDistributedMember grantor = service.getLockGrantorId().getLockGrantorMember();
assertThat(grantor).isNotNull();
System.out.println("In identifyLockGrantor - grantor is " + grantor);
return grantor;
}
private static Boolean isLockGrantor(String serviceName) {
DLockService service = (DLockService) DistributedLockService.getServiceNamed(serviceName);
assertThat(service).isNotNull();
Boolean result = service.isLockGrantor();
System.out.println("In isLockGrantor: " + result);
return result;
}
protected static void becomeLockGrantor(String serviceName) {
DLockService service = (DLockService) DistributedLockService.getServiceNamed(serviceName);
assertThat(service).isNotNull();
System.out.println("About to call becomeLockGrantor...");
service.becomeLockGrantor();
}
@Test
public void testGrantorSelection() {
// TODO change distributedCreateService usage to be concurrent threads
final String serviceName = "testGrantorSelection_" + getUniqueName();
final List<VM> vmList = distributedCreateService(4, serviceName, true);
getLockGrantor(serviceName);
assertGrantorIsConsistent(serviceName, vmList);
}
@Test
public void testBasicGrantorRecovery() {
int numVMs = 4;
final String serviceName = "testBasicGrantorRecovery_" + getUniqueName();
final List<VM> vmList = distributedCreateService(numVMs, serviceName, false);
// Apparently it's necessary to query the lock grantor to have a server be nominated grantor.
vmList.get(0).invoke(() -> getLockGrantor(serviceName));
VM originalGrantorVM = identifyLockGrantorWithSanityCheck(serviceName, vmList);
VM lockHolder = null;
for (VM vm : vmList) {
if (vm != originalGrantorVM) {
lockHolder = vm;
vm.invoke(() -> DLockService.getServiceNamed(serviceName).lock("foo", 1200, -1));
break;
}
}
final InternalDistributedMember originalGrantorMember =
originalGrantorVM.invoke(() -> getLockGrantor(serviceName));
System.out.println("originalGrantorMember = " + originalGrantorMember);
final InternalDistributedMember originalGrantorSelfReportedMember = originalGrantorVM
.invoke(() -> InternalDistributedSystem.getAnyInstance().getDistributedMember());
assertThat(originalGrantorMember).isEqualTo(originalGrantorSelfReportedMember);
originalGrantorVM.invoke(() -> disconnectFromDS());
vmList.remove(originalGrantorVM);
await("vm0 leave has been processed")
.until(() -> vmList.parallelStream()
.noneMatch(vm -> vm.invoke(() -> InternalDistributedSystem
.getAnyInstance()
.getAllOtherMembers()
.contains(originalGrantorMember))));
// It seems that we need to actually use the lock service to have a new grantor be picked.
lockHolder.invoke(() -> DLockService.getServiceNamed(serviceName).unlock("foo"));
final VM newGrantorMember = identifyLockGrantorWithSanityCheck(serviceName, vmList);
assertThat(newGrantorMember).isNotEqualTo(originalGrantorMember);
}
/**
* This is just a better-sounding wrapper for when we want to do only consistency checks.
*/
private static void assertGrantorIsConsistent(String serviceName, Collection<VM> vmList) {
identifyLockGrantorWithSanityCheck(serviceName, vmList);
}
private static VM identifyLockGrantorWithSanityCheck(String serviceName, Collection<VM> vmList) {
VM grantorVM = null;
for (VM vm : vmList) {
if (vm.invoke(() -> isLockGrantor(serviceName))) {
grantorVM = vm;
}
}
assertThat(grantorVM).withFailMessage("Some VM should think it's the lock grantor").isNotNull();
final DistributedMember grantorID =
grantorVM.invoke(() -> InternalDistributedSystem.getAnyInstance().getDistributedMember());
for (VM vm : vmList) {
if (vm != grantorVM) {
assertThat(vm.invoke(() -> isLockGrantor(serviceName))).isFalse();
}
assertThat(vm.invoke(() -> getLockGrantor(serviceName))).isEqualTo(grantorID);
}
return grantorVM;
}
@Test
public void testLockFailover() {
final int originalGrantorVM = 0;
final int oneVM = 1;
final int twoVM = 2;
final String serviceName = "testLockFailover-" + getUniqueName();
// create lock services...
if (logger.isDebugEnabled()) {
logger.debug("[testLockFailover] create services");
}
VM.getVM(originalGrantorVM)
.invoke(() -> DistributedLockServiceDUnitTest.remoteCreateService(serviceName));
VM.getVM(oneVM)
.invoke(() -> DistributedLockServiceDUnitTest.remoteCreateService(serviceName));
VM.getVM(twoVM)
.invoke(() -> DistributedLockServiceDUnitTest.remoteCreateService(serviceName));
VM.getVM(originalGrantorVM)
.invoke(() -> DistributedLockServiceDUnitTest.getLockGrantor(serviceName));
Boolean isGrantor = VM.getVM(originalGrantorVM)
.invoke(() -> DistributedLockServiceDUnitTest.isLockGrantor(serviceName));
assertThat(isGrantor)
.withFailMessage("First member calling getLockGrantor failed to become grantor")
.isEqualTo(Boolean.TRUE);
// get locks...
if (logger.isDebugEnabled()) {
logger.debug("[testLockFailover] get lock");
}
Boolean locked = VM.getVM(originalGrantorVM).invoke(
() -> DistributedLockServiceDUnitTest.lock(serviceName, "KEY-" + originalGrantorVM));
assertThat(locked).withFailMessage("Failed to get lock in testLockFailover")
.isEqualTo(Boolean.TRUE);
locked = VM.getVM(twoVM)
.invoke(() -> DistributedLockServiceDUnitTest.lock(serviceName, "KEY-" + twoVM));
assertThat(locked).withFailMessage("Failed to get lock in testLockFailover")
.isEqualTo(Boolean.TRUE);
locked = VM.getVM(oneVM)
.invoke(() -> DistributedLockServiceDUnitTest.lock(serviceName, "KEY-" + oneVM));
assertThat(locked).withFailMessage("Failed to get lock in testLockFailover")
.isEqualTo(Boolean.TRUE);
// disconnect originalGrantorVM...
if (logger.isDebugEnabled()) {
logger.debug("[testLockFailover] disconnect originalGrantorVM");
}
VM.getVM(originalGrantorVM).invoke(new SerializableRunnable() {
@Override
public void run() {
disconnectFromDS();
}
});
try {
Thread.sleep(100);
} catch (InterruptedException ignore) {
fail("interrupted");
}
// verify locks by unlocking...
if (logger.isDebugEnabled()) {
logger.debug("[testLockFailover] release locks");
}
Boolean unlocked = VM.getVM(twoVM)
.invoke(() -> DistributedLockServiceDUnitTest.unlock(serviceName, "KEY-" + twoVM));
assertThat(unlocked).withFailMessage("Failed to release lock in testLockFailover")
.isEqualTo(Boolean.TRUE);
unlocked = VM.getVM(oneVM)
.invoke(() -> DistributedLockServiceDUnitTest.unlock(serviceName, "KEY-" + oneVM));
assertThat(unlocked).withFailMessage("Failed to release lock in testLockFailover")
.isEqualTo(Boolean.TRUE);
// switch locks...
locked = VM.getVM(oneVM)
.invoke(() -> DistributedLockServiceDUnitTest.lock(serviceName, "KEY-" + twoVM));
assertThat(unlocked).withFailMessage("Failed to release lock in testLockFailover")
.isEqualTo(Boolean.TRUE);
locked = VM.getVM(twoVM)
.invoke(() -> DistributedLockServiceDUnitTest.lock(serviceName, "KEY-" + oneVM));
assertThat(unlocked).withFailMessage("Failed to release lock in testLockFailover")
.isEqualTo(Boolean.TRUE);
unlocked = VM.getVM(oneVM)
.invoke(() -> DistributedLockServiceDUnitTest.unlock(serviceName, "KEY-" + twoVM));
assertThat(unlocked).withFailMessage("Failed to release lock in testLockFailover")
.isEqualTo(Boolean.TRUE);
unlocked = VM.getVM(twoVM)
.invoke(() -> DistributedLockServiceDUnitTest.unlock(serviceName, "KEY-" + oneVM));
assertThat(unlocked).withFailMessage("Failed to release lock in testLockFailover")
.isEqualTo(Boolean.TRUE);
// verify grantor is unique...
if (logger.isDebugEnabled()) {
logger.debug("[testLockFailover] verify grantor identity");
}
InternalDistributedMember oneID = VM.getVM(oneVM)
.invoke(() -> DistributedLockServiceDUnitTest.getLockGrantor(serviceName));
InternalDistributedMember twoID = VM.getVM(twoVM)
.invoke(() -> DistributedLockServiceDUnitTest.getLockGrantor(serviceName));
assertThat(oneID != null && twoID != null)
.withFailMessage("Failed to identifyLockGrantor in testLockFailover").isTrue();
assertThat(twoID).withFailMessage("Failed grantor uniqueness in testLockFailover")
.isEqualTo(oneID);
}
@Test
public void testLockThenBecomeLockGrantor() {
final int originalGrantorVM = 0;
final int becomeGrantorVM = 1;
final int thirdPartyVM = 2;
final String serviceName = "testLockThenBecomeLockGrantor-" + getUniqueName();
// create lock services...
if (logger.isDebugEnabled()) {
logger.debug("[testLockThenBecomeLockGrantor] create services");
}
VM.getVM(originalGrantorVM)
.invoke(() -> DistributedLockServiceDUnitTest.remoteCreateService(serviceName));
try {
Thread.sleep(20);
} catch (InterruptedException ignore) {
fail("interrupted");
}
VM.getVM(becomeGrantorVM)
.invoke(() -> DistributedLockServiceDUnitTest.remoteCreateService(serviceName));
VM.getVM(thirdPartyVM)
.invoke(() -> DistributedLockServiceDUnitTest.remoteCreateService(serviceName));
VM.getVM(originalGrantorVM)
.invoke(() -> DistributedLockServiceDUnitTest.getLockGrantor(serviceName));
Boolean isGrantor = VM.getVM(originalGrantorVM)
.invoke(() -> DistributedLockServiceDUnitTest.isLockGrantor(serviceName));
assertThat(isGrantor).isEqualTo(Boolean.TRUE)
.withFailMessage("First member calling getLockGrantor failed to become grantor");
// control...
if (logger.isDebugEnabled()) {
logger.debug("[testLockThenBecomeLockGrantor] check control");
}
Boolean check = VM.getVM(becomeGrantorVM).invoke(
() -> DistributedLockServiceDUnitTest.unlock(serviceName, "KEY-" + becomeGrantorVM));
assertThat(check).isEqualTo(Boolean.FALSE)
.withFailMessage("Check of control failed... unlock succeeded but nothing locked");
// get locks...
if (logger.isDebugEnabled()) {
logger.debug("[testLockThenBecomeLockGrantor] get lock");
}
Boolean locked = VM.getVM(originalGrantorVM).invoke(
() -> DistributedLockServiceDUnitTest.lock(serviceName, "KEY-" + originalGrantorVM));
assertThat(locked).isEqualTo(Boolean.TRUE)
.withFailMessage("Failed to get lock in testLockThenBecomeLockGrantor");
locked = VM.getVM(thirdPartyVM)
.invoke(() -> DistributedLockServiceDUnitTest.lock(serviceName, "KEY-" + thirdPartyVM));
assertThat(locked).isEqualTo(Boolean.TRUE)
.withFailMessage("Failed to get lock in testLockThenBecomeLockGrantor");
locked = VM.getVM(becomeGrantorVM)
.invoke(() -> DistributedLockServiceDUnitTest.lock(serviceName, "KEY-" + becomeGrantorVM));
assertThat(locked).isEqualTo(Boolean.TRUE)
.withFailMessage("Failed to get lock in testLockThenBecomeLockGrantor");
// become lock grantor...
if (logger.isDebugEnabled()) {
logger.debug("[testLockThenBecomeLockGrantor] become lock grantor");
}
VM.getVM(becomeGrantorVM)
.invoke(() -> DistributedLockServiceDUnitTest.becomeLockGrantor(serviceName));
try {
Thread.sleep(20);
} catch (InterruptedException ignore) {
fail("interrupted");
}
isGrantor = VM.getVM(becomeGrantorVM)
.invoke(() -> DistributedLockServiceDUnitTest.isLockGrantor(serviceName));
assertThat(isGrantor).isEqualTo(Boolean.TRUE).withFailMessage("Failed to become lock grantor");
// verify locks by unlocking...
if (logger.isDebugEnabled()) {
logger.debug("[testLockThenBecomeLockGrantor] release locks");
}
Boolean unlocked = VM.getVM(originalGrantorVM).invoke(
() -> DistributedLockServiceDUnitTest.unlock(serviceName, "KEY-" + originalGrantorVM));
assertThat(unlocked).isEqualTo(Boolean.TRUE)
.withFailMessage("Failed to release lock in testLockThenBecomeLockGrantor");
unlocked = VM.getVM(thirdPartyVM)
.invoke(() -> DistributedLockServiceDUnitTest.unlock(serviceName, "KEY-" + thirdPartyVM));
assertThat(unlocked).isEqualTo(Boolean.TRUE)
.withFailMessage("Failed to release lock in testLockThenBecomeLockGrantor");
unlocked = VM.getVM(becomeGrantorVM).invoke(
() -> DistributedLockServiceDUnitTest.unlock(serviceName, "KEY-" + becomeGrantorVM));
assertThat(unlocked).isEqualTo(Boolean.TRUE)
.withFailMessage("Failed to release lock in testLockThenBecomeLockGrantor");
// test for bug in which transferred token gets re-entered causing lock recursion
unlocked = VM.getVM(becomeGrantorVM).invoke(
() -> DistributedLockServiceDUnitTest.unlock(serviceName, "KEY-" + becomeGrantorVM));
assertThat(unlocked).isEqualTo(Boolean.FALSE)
.withFailMessage("Transfer of tokens caused lock recursion in held lock");
}
@Test
public void testBecomeLockGrantor() {
// create lock services...
int numVMs = 4;
final String serviceName = "testBecomeLockGrantor-" + getUniqueName();
distributedCreateService(numVMs, serviceName, true);
// each one gets a lock...
for (int vm = 0; vm < numVMs; vm++) {
final int finalvm = vm;
Boolean locked = VM.getVM(finalvm)
.invoke(() -> DistributedLockServiceDUnitTest.lock(serviceName, "obj-" + finalvm));
assertThat(locked).isEqualTo(Boolean.TRUE)
.withFailMessage("Failed to get lock in testBecomeLockGrantor");
}
// find the grantor...
int originalVM = -1;
InternalDistributedMember oldGrantor = null;
for (int vm = 0; vm < numVMs; vm++) {
Boolean isGrantor = VM.getVM(vm)
.invoke(() -> DistributedLockServiceDUnitTest.isLockGrantor(serviceName));
if (isGrantor) {
originalVM = vm;
oldGrantor = VM.getVM(vm)
.invoke(() -> DistributedLockServiceDUnitTest.getLockGrantor(
serviceName));
break;
}
}
if (logger.isDebugEnabled()) {
logger.debug("[testBecomeLockGrantor] original grantor is " + oldGrantor);
}
// have one call becomeLockGrantor
for (int vm = 0; vm < numVMs; vm++) {
if (vm != originalVM) {
VM.getVM(vm)
.invoke(() -> DistributedLockServiceDUnitTest.becomeLockGrantor(serviceName));
Boolean isGrantor = VM.getVM(vm)
.invoke(() -> DistributedLockServiceDUnitTest.isLockGrantor(serviceName));
assertThat(isGrantor).isEqualTo(Boolean.TRUE)
.withFailMessage("isLockGrantor is false after calling becomeLockGrantor");
break;
}
}
if (logger.isDebugEnabled()) {
logger.debug("[testBecomeLockGrantor] one vm has called becomeLockGrantor...");
}
InternalDistributedMember newGrantor = null;
for (int vm = 0; vm < numVMs; vm++) {
Boolean isGrantor = VM.getVM(vm)
.invoke(() -> DistributedLockServiceDUnitTest.isLockGrantor(serviceName));
if (isGrantor) {
newGrantor = VM.getVM(vm)
.invoke(() -> DistributedLockServiceDUnitTest.getLockGrantor(
serviceName));
break;
}
}
assertThat(newGrantor).isNotEqualTo(oldGrantor);
// verify locks still held by unlocking
// each one unlocks...
for (int vm = 0; vm < numVMs; vm++) {
final int finalvm = vm;
Boolean unlocked = VM.getVM(finalvm)
.invoke(() -> DistributedLockServiceDUnitTest.unlock(serviceName, "obj-" + finalvm));
assertThat(unlocked).isEqualTo(Boolean.TRUE)
.withFailMessage("Failed to unlock in testBecomeLockGrantor");
}
if (logger.isDebugEnabled()) {
logger.debug("[testBecomeLockGrantor] finished");
}
// verify that pending requests are granted by unlocking them also
}
@Test
public void testOnlyOneVmAcquiresWithTryLock() throws Exception {
final Long waitMillis = 100L;
// create lock services...
if (logger.isDebugEnabled()) {
logger.debug("[testTryLock] create lock services");
}
final String serviceName = "testTryLock-" + getUniqueName();
final List<VM> vms = distributedCreateService(4, serviceName, false);
final List<AsyncInvocation<Boolean>> invocations = vms.stream()
.map(vm -> vm.invokeAsync(
() -> DistributedLockServiceDUnitTest.tryLock(serviceName, "KEY", waitMillis)))
.collect(Collectors
.toList());
int lockCount = 0;
for (AsyncInvocation<Boolean> invocation : invocations) {
if (invocation.get()) {
lockCount++;
}
}
assertThat(lockCount).isEqualTo(1)
.withFailMessage("More than one vm acquired the tryLock");
if (logger.isDebugEnabled()) {
logger.debug("[testTryLock] unlock tryLock");
}
int unlockCount = 0;
for (VM vm : vms) {
Boolean unlocked =
vm.invoke(() -> DistributedLockServiceDUnitTest.unlock(serviceName, "KEY"));
if (unlocked) {
unlockCount++;
}
}
assertThat(unlockCount).withFailMessage("More than one vm unlocked the tryLock").isEqualTo(1);
}
@Test
public void testOneGetsThenOtherGets() throws Exception { // (numVMs, numThreadsPerVM)
doOneGetsThenOtherGets(1, 1);
// doOneGetsThenOtherGets(2, 2);
// doOneGetsThenOtherGets(3, 3);
doOneGetsThenOtherGets(4, 3);
}
@Test
public void testLockDifferentNames() {
String serviceName = getUniqueName();
// Same VM
remoteCreateService(serviceName);
DistributedLockService service = DistributedLockService.getServiceNamed(serviceName);
assertThat(service.lock("obj1", -1, -1)).isTrue();
assertThat(service.lock("obj2", -1, -1)).isTrue();
service.unlock("obj1");
service.unlock("obj2");
// Different VMs
VM vm = VM.getVM(0);
vm.invoke(() -> remoteCreateService(serviceName));
assertThat(service.lock("masterVMobj", -1, -1)).isTrue();
assertThat(vm.invoke(() -> getLockAndIncrement(serviceName, "otherVMobj", -1, 0))).isTrue();
service.unlock("masterVMobj");
}
@Test
public void testLocalGetLockAndIncrement() throws Exception {
String serviceName = getUniqueName();
remoteCreateService(serviceName);
DistributedLockService.getServiceNamed(serviceName);
assertThat(getLockAndIncrement(serviceName, "localVMobj", -1, 0)).isTrue();
}
@Test
public void testRemoteGetLockAndIncrement() {
String serviceName = getUniqueName();
VM vm = VM.getVM(0);
vm.invoke(() -> remoteCreateService(serviceName));
assertThat(vm.invoke(() -> getLockAndIncrement(serviceName, "remoteVMobj",
-1, 0))).isTrue();
}
@Test
public void testLockSameNameDifferentService() {
String serviceName1 = getUniqueName() + "_1";
String serviceName2 = getUniqueName() + "_2";
String objName = "obj";
// Same VM
remoteCreateService(serviceName1);
remoteCreateService(serviceName2);
DistributedLockService service1 = DistributedLockService.getServiceNamed(serviceName1);
DistributedLockService service2 = DistributedLockService.getServiceNamed(serviceName2);
assertThat(service1.lock(objName, -1, -1)).isTrue();
assertThat(service2.lock(objName, -1, -1)).isTrue();
service1.unlock(objName);
service2.unlock(objName);
// Different VMs
VM vm = VM.getVM(0);
vm.invoke(() -> remoteCreateService(serviceName1));
vm.invoke(() -> remoteCreateService(serviceName2));
assertThat(service1.lock(objName, -1, -1)).isTrue();
assertThat(vm.invoke(() -> getLockAndIncrement(serviceName2, objName, -1, 0))).isTrue();
service1.unlock(objName);
}
@Test
public void testLeaseDoesntExpire() {
String serviceName = getUniqueName();
final Object objName = 3;
// Same VM
remoteCreateService(serviceName);
final DistributedLockService service = DistributedLockService.getServiceNamed(serviceName);
// lock objName with a sufficiently long lease
assertThat(service.lock(objName, -1, 60000)).isTrue();
// try to lock in another thread, with a timeout shorter than above lease
final boolean[] resultHolder = new boolean[] {false};
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
resultHolder[0] = !service.lock(objName, 1000, -1);
}
});
thread.start();
ThreadUtils.join(thread, 30 * 1000);
assertThat(resultHolder[0]).isTrue();
// the unlock should succeed without throwing LeaseExpiredException
service.unlock(objName);
// Different VM
VM vm = VM.getVM(0);
vm.invoke(() -> remoteCreateService(serviceName));
// lock objName in this VM with a sufficiently long lease
assertThat(service.lock(objName, -1, 60000)).isTrue();
// try to lock in another VM, with a timeout shorter than above lease
assertThat(vm.invoke(() -> getLockAndIncrement(serviceName, objName, 1000L, 0L))).isFalse();
// the unlock should succeed without throwing LeaseExpiredException
service.unlock(objName);
}
@Test
public void testLockUnlock() {
String serviceName = getUniqueName();
Object objName = 42;
remoteCreateService(serviceName);
DistributedLockService service = DistributedLockService.getServiceNamed(serviceName);
assertThat(!service.isHeldByCurrentThread(objName)).isTrue();
service.lock(objName, -1, -1);
assertThat(service.isHeldByCurrentThread(objName)).isTrue();
service.unlock(objName);
assertThat(!service.isHeldByCurrentThread(objName)).isTrue();
}
@Test
public void testLockExpireUnlock() throws Exception {
long leaseMs = 200;
long waitBeforeLockingMs = 210;
String serviceName = getUniqueName();
Object objName = 42;
remoteCreateService(serviceName);
DistributedLockService service = DistributedLockService.getServiceNamed(serviceName);
assertThat(!service.isHeldByCurrentThread(objName)).isTrue();
assertThat(service.lock(objName, -1, leaseMs)).isTrue();
assertThat(service.isHeldByCurrentThread(objName)).isTrue();
Thread.sleep(waitBeforeLockingMs); // should expire...
assertThat(!service.isHeldByCurrentThread(objName)).isTrue();
Assertions.assertThatThrownBy(() -> service.unlock(objName))
.isInstanceOf(LeaseExpiredException.class);
}
@Test
public void testLockRecursion() {
String serviceName = getUniqueName();
Object objName = 42;
remoteCreateService(serviceName);
DistributedLockService service = DistributedLockService.getServiceNamed(serviceName);
assertThat(!service.isHeldByCurrentThread(objName)).isTrue();
// initial lock...
assertThat(service.lock(objName, -1, -1)).isTrue();
assertThat(service.isHeldByCurrentThread(objName)).isTrue();
// recursion +1...
assertThat(service.lock(objName, -1, -1)).isTrue();
// recursion -1...
service.unlock(objName);
assertThat(service.isHeldByCurrentThread(objName)).isTrue();
// and unlock...
service.unlock(objName);
assertThat(!service.isHeldByCurrentThread(objName)).isTrue();
}
@Test
public void testLockRecursionWithExpiration() throws Exception {
long leaseMs = 500;
long waitBeforeLockingMs = 750;
String serviceName = getUniqueName();
Object objName = 42;
remoteCreateService(serviceName);
DistributedLockService service = DistributedLockService.getServiceNamed(serviceName);
assertThat(!service.isHeldByCurrentThread(objName)).isTrue();
// initial lock...
assertThat(service.lock(objName, -1, leaseMs)).isTrue();
assertThat(service.isHeldByCurrentThread(objName)).isTrue();
// recursion +1...
assertThat(service.lock(objName, -1, leaseMs)).isTrue();
assertThat(service.isHeldByCurrentThread(objName)).isTrue();
// expire...
Thread.sleep(waitBeforeLockingMs);
assertThat(!service.isHeldByCurrentThread(objName)).isTrue();
// should fail...
try {
service.unlock(objName);
fail("unlock should have thrown LeaseExpiredException");
} catch (LeaseExpiredException ex) {
}
// relock it...
assertThat(service.lock(objName, -1, leaseMs)).isTrue();
assertThat(service.isHeldByCurrentThread(objName)).isTrue();
// and unlock to verify no recursion...
service.unlock(objName);
assertThat(!service.isHeldByCurrentThread(objName)).isTrue(); // throws failure!!
// go thru again in different order...
assertThat(!service.isHeldByCurrentThread(objName)).isTrue();
// initial lock...
assertThat(service.lock(objName, -1, leaseMs)).isTrue();
assertThat(service.isHeldByCurrentThread(objName)).isTrue();
// expire...
Thread.sleep(waitBeforeLockingMs);
assertThat(!service.isHeldByCurrentThread(objName)).isTrue();
// relock it...
assertThat(service.lock(objName, -1, leaseMs)).isTrue();
assertThat(service.isHeldByCurrentThread(objName)).isTrue();
// and unlock to verify no recursion...
service.unlock(objName);
assertThat(!service.isHeldByCurrentThread(objName)).isTrue();
}
@Test
public void testLeaseExpiresBeforeOtherLocks() throws InterruptedException {
leaseExpiresTest(false);
}
@Test
public void testLeaseExpiresWhileOtherLocks() throws InterruptedException {
leaseExpiresTest(true);
}
private void leaseExpiresTest(boolean tryToLockBeforeExpiration) throws InterruptedException {
long leaseMs = 100;
long waitBeforeLockingMs = tryToLockBeforeExpiration ? 50 : 110;
final String serviceName = getUniqueName();
final Object objName = 3;
// Same VM
remoteCreateService(serviceName);
final DistributedLockService service = DistributedLockService.getServiceNamed(serviceName);
// lock objName with a short lease
assertThat(service.lock(objName, -1, leaseMs)).isTrue();
Thread.sleep(waitBeforeLockingMs);
if (waitBeforeLockingMs > leaseMs) {
assertThat(!service.isHeldByCurrentThread(objName)).isTrue();
}
// try to lock in another thread - lease should have expired
final boolean[] resultHolder = new boolean[] {false};
Thread thread = new Thread(() -> {
resultHolder[0] = service.lock(objName, -1, -1);
service.unlock(objName);
assertThat(!service.isHeldByCurrentThread(objName)).isTrue();
});
thread.start();
ThreadUtils.join(thread, 30 * 1000);
assertThat(resultHolder[0]).isTrue();
// this thread's unlock should throw LeaseExpiredException
Assertions.assertThatThrownBy(() -> {
service.unlock(objName);
}).isInstanceOf(LeaseExpiredException.class);
VM vm = VM.getVM(0);
vm.invoke(() -> remoteCreateService(serviceName));
// lock objName in this VM with a short lease
assertThat(service.lock(objName, -1, leaseMs)).isTrue();
Thread.sleep(waitBeforeLockingMs);
if (logger.isDebugEnabled()) {
logger.debug("[testLeaseExpires] succeed lock in other vm");
}
// try to lock in another VM - should succeed
assertThat(vm.invoke(() -> getLockAndIncrement(serviceName, objName, (long) -1, 0L)))
.isEqualTo(TRUE);
if (logger.isDebugEnabled()) {
logger.debug("[testLeaseExpires] unlock should throw LeaseExpiredException again");
}
// this VMs unlock should throw LeaseExpiredException
Assertions.assertThatThrownBy(() -> {
service.unlock(objName);
}).isInstanceOf(LeaseExpiredException.class);
}
@Test
public void testSuspendLockingAfterExpiration() throws Exception {
final long leaseMillis = 100;
final long suspendWaitMillis = 10000;
final String serviceName = getUniqueName();
final Object key = 3;
// controller locks key and then expires - controller is grantor
DistributedLockService dls = DistributedLockService.create(serviceName, getSystem());
assertThat(dls.lock(key, -1, leaseMillis)).isTrue();
// wait for expiration
Thread.sleep(leaseMillis * 2);
Assertions.assertThatThrownBy(() -> dls.unlock(key)).isInstanceOf(LeaseExpiredException.class);
// other vm calls suspend
if (logger.isDebugEnabled()) {
logger.debug("[leaseExpiresThenSuspendTest] call to suspend locking");
}
VM.getVM(0).invoke(() -> {
final DistributedLockService dlock =
DistributedLockService.create(serviceName, getSystem());
dlock.suspendLocking(suspendWaitMillis);
dlock.resumeLocking();
assertThat(dlock.lock(key, -1, leaseMillis)).isTrue();
dlock.unlock(key);
});
}
private volatile boolean started = false;
private volatile boolean gotLock = false;
volatile Throwable exception = null;
private volatile Throwable throwable = null;
@Test
public void testLockInterruptiblyIsInterruptible() {
started = false;
gotLock = false;
exception = null;
throwable = null;
// Get a lock in first thread
final String serviceName = getUniqueName();
final DistributedLockService service = DistributedLockService.create(serviceName, dlstSystem);
service.becomeLockGrantor();
assertThat(service.lock("obj", 1000, -1)).isTrue();
// Start second thread that tries to lock in second thread
Thread thread2 = new Thread(() -> {
try {
started = true;
gotLock = service.lockInterruptibly("obj", -1, -1);
} catch (InterruptedException ex) {
exception = ex;
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable t) {
throwable = t;
}
});
thread2.start();
// Interrupt second thread
while (!started)
Thread.yield();
thread2.interrupt();
ThreadUtils.join(thread2, 20 * 1000);
// Expect it got InterruptedException and didn't lock the service
assertThat(gotLock).isFalse();
if (throwable != null) {
logger.warn("testLockInterruptiblyIsInterruptible threw unexpected Throwable", throwable);
}
assertThat(exception).isNotNull();
// Unlock "obj" in first thread
service.unlock("obj");
// Make sure it didn't get locked by second thread
logger.info(
"[testLockInterruptiblyIsInterruptible] try to get lock with timeout should not fail");
assertThat(service.lock("obj", 5000, -1)).isTrue();
DistributedLockService.destroy(serviceName);
}
private volatile boolean wasFlagSet = false;
@Test
public void testLockIsNotInterruptible() throws Exception {
// Lock entire service in first thread
if (logger.isDebugEnabled()) {
logger.debug("[testLockIsNotInterruptible] lock in first thread");
}
started = false;
gotLock = false;
exception = null;
wasFlagSet = false;
final String serviceName = getUniqueName();
final DistributedLockService service = DistributedLockService.create(serviceName, dlstSystem);
assertThat(service.lock("obj", 1000, -1)).isTrue();
// Start second thread that tries to lock in second thread
if (logger.isDebugEnabled()) {
logger.debug("[testLockIsNotInterruptible] attempt lock in second thread");
}
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
try {
started = true;
gotLock = service.lock("obj", -1, -1);
if (logger.isDebugEnabled()) {
logger.debug("[testLockIsNotInterruptible] thread2 finished lock() - got " + gotLock);
}
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable ex) {
logger.warn("[testLockIsNotInterruptible] Caught...", ex);
exception = ex;
}
wasFlagSet = Thread.currentThread().isInterrupted();
}
});
thread2.start();
// Interrupt second thread
if (logger.isDebugEnabled()) {
logger.debug("[testLockIsNotInterruptible] interrupt second thread");
}
while (!started)
Thread.yield();
Thread.sleep(500);
thread2.interrupt();
// Expect it didn't get an exception and didn't lock the service
Thread.sleep(500);
assertThat(gotLock).isFalse();
assertThat(exception).isNull();
// Unlock entire service in first thread
if (logger.isDebugEnabled()) {
logger.debug("[testLockIsNotInterruptible] unlock in first thread");
}
service.unlock("obj");
Thread.sleep(500);
// Expect that thread2 should now complete execution.
ThreadUtils.join(thread2, 20 * 1000);
// Now thread2 should have gotten the lock, not the exception, but the
// thread's flag should be set
if (logger.isDebugEnabled()) {
logger.debug("[testLockIsNotInterruptible] verify second thread got lock");
}
assertThat(exception).isNull();
assertThat(gotLock).isTrue();
assertThat(wasFlagSet).isTrue();
}
/**
* Test DistributedLockService.acquireExclusiveLocking(), releaseExclusiveLocking()
*/
@Test
public void testSuspendLockingBasic() {
final DistributedLockService service =
DistributedLockService.create(getUniqueName(), dlstSystem);
try {
service.resumeLocking();
fail("Didn't throw LockNotHeldException");
} catch (LockNotHeldException ex) {
// expected
}
assertThat(service.suspendLocking(-1)).isTrue();
service.resumeLocking();
// It's not reentrant
assertThat(service.suspendLocking(1000)).isTrue();
try {
service.suspendLocking(1);
fail("didn't get IllegalStateException");
} catch (IllegalStateException ex) {
// expected
}
service.resumeLocking();
// Get "false" if another thread is holding it
Thread thread = new Thread(() -> {
System.out.println("new thread about to suspendLocking()");
assertThat(service.suspendLocking(1000)).isTrue();
});
thread.start();
ThreadUtils.join(thread, 30 * 1000);
System.out.println("main thread about to suspendLocking");
assertThat(!service.suspendLocking(1000)).isTrue();
}
/**
* Test that exlusive locking prohibits locking activity
*/
@Test
public void testSuspendLockingProhibitsLocking() {
final String name = getUniqueName();
distributedCreateService(2, name, true);
DistributedLockService service = DistributedLockService.getServiceNamed(name);
// Should be able to lock from other VM
VM vm1 = VM.getVM(1);
assertThat(vm1.invoke(() -> DistributedLockServiceDUnitTest.tryToLock(name))).isTrue();
assertThat(service.suspendLocking(1000)).isTrue();
// vm1 is the grantor... use debugHandleSuspendTimeouts
vm1.invoke(new SerializableRunnable("setDebugHandleSuspendTimeouts") {
@Override
public void run() {
DLockService dls = (DLockService) DistributedLockService.getServiceNamed(name);
assertThat(dls.isLockGrantor()).isTrue();
DLockGrantor grantor = dls.getGrantorWithNoSync();
grantor.setDebugHandleSuspendTimeouts(5000);
}
});
// Shouldn't be able to lock a name from another VM
assertThat(!vm1.invoke(() -> DistributedLockServiceDUnitTest.tryToLock(name))).isTrue();
service.resumeLocking();
vm1.invoke(new SerializableRunnable("unsetDebugHandleSuspendTimeouts") {
@Override
public void run() {
DLockService dls = (DLockService) DistributedLockService.getServiceNamed(name);
assertThat(dls.isLockGrantor()).isTrue();
DLockGrantor grantor = dls.getGrantorWithNoSync();
grantor.setDebugHandleSuspendTimeouts(0);
}
});
// Should be able to lock again
assertThat(vm1.invoke(() -> DistributedLockServiceDUnitTest.tryToLock(name))).isTrue();
}
/**
* Test that suspend locking behaves under various usage patterns. This ensures that suspend and
* regular locks behave as ReadWriteLocks and processing occurs in order.
*/
@Test
public void testSuspendLockingBehaves() throws Exception {
try {
doTestSuspendLockingBehaves();
} finally {
Invoke.invokeInEveryVM(new SerializableRunnable() {
@Override
public void run() {
try {
if (suspendClientSuspendLockingBehaves != null) {
suspendClientSuspendLockingBehaves.stop();
suspendClientSuspendLockingBehaves = null;
}
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable t) {
logger.error("Error in testSuspendLockingBehaves finally", t);
}
try {
if (lockClientSuspendLockingBehaves != null) {
lockClientSuspendLockingBehaves.stop();
lockClientSuspendLockingBehaves = null;
}
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable t) {
logger.error("Error in testSuspendLockingBehaves finally", t);
}
}
});
}
}
private void doTestSuspendLockingBehaves() {
final String dlsName = getUniqueName();
final VM vmGrantor = VM.getVM(0);
final VM vmOne = VM.getVM(1);
final VM vmTwo = VM.getVM(2);
final VM vmThree = VM.getVM(3);
final String key1 = "key1";
// TODO: make sure suspend thread can get other locks
// TODO: test local (in grantor) locks and suspends also
// define some SerializableRunnables
final SerializableRunnable createDLS = new SerializableRunnable("Create " + dlsName) {
@Override
public void run() {
DistributedLockService.create(dlsName, getSystem());
lockClientSuspendLockingBehaves = new BasicLockClient(dlsName, key1);
suspendClientSuspendLockingBehaves = new BasicLockClient(dlsName, key1);
assertThat(isLockGrantor(dlsName)).isFalse();
}
};
final SerializableRunnable suspendLocking =
new SerializableRunnable("Suspend locking " + dlsName) {
@Override
public void run() {
suspendClientSuspendLockingBehaves.suspend();
}
};
final SerializableRunnable resumeLocking =
new SerializableRunnable("Resume locking " + dlsName) {
@Override
public void run() {
suspendClientSuspendLockingBehaves.resume();
}
};
final SerializableRunnable lockKey = new SerializableRunnable("Get lock " + dlsName) {
@Override
public void run() {
lockClientSuspendLockingBehaves.lock();
}
};
final SerializableRunnable unlockKey = new SerializableRunnable("Unlock " + dlsName) {
@Override
public void run() {
lockClientSuspendLockingBehaves.unlock();
}
};
// create grantor
logger.info("[testSuspendLockingBehaves] Create grantor " + dlsName);
vmGrantor.invoke(new SerializableRunnable("Create grantor " + dlsName) {
@Override
public void run() {
DistributedLockService.create(dlsName, getSystem());
DistributedLockService.getServiceNamed(dlsName).lock(key1, -1, -1);
DistributedLockService.getServiceNamed(dlsName).unlock(key1);
assertThat(isLockGrantor(dlsName)).isTrue();
}
});
// create dls in other vms
vmOne.invoke(createDLS);
vmTwo.invoke(createDLS);
vmThree.invoke(createDLS);
// get a lock
logger.info("[testSuspendLockingBehaves] line up vms for lock");
vmOne.invoke(lockKey);
AsyncInvocation vmTwoLocking = vmTwo.invokeAsync(lockKey);
Wait.pause(2000); // make sure vmTwo is first in line
AsyncInvocation vmThreeLocking = vmThree.invokeAsync(lockKey);
Wait.pause(2000);
// make sure vmTwo and vmThree are still waiting for lock on key1
Wait.pause(100);
assertThat(vmTwoLocking.isAlive()).isTrue();
Wait.pause(100);
assertThat(vmThreeLocking.isAlive()).isTrue();
// let vmTwo get key
logger.info("[testSuspendLockingBehaves] unlock so vmTwo can get key");
vmOne.invoke(unlockKey);
ThreadUtils.join(vmTwoLocking, 10 * 1000);
// start suspending in vmOne and vmTwo
logger.info("[testSuspendLockingBehaves] start suspending requests");
AsyncInvocation vmOneSuspending = vmOne.invokeAsync(suspendLocking);
Wait.pause(2000); // make sure vmOne is first in line
AsyncInvocation vmTwoSuspending = vmTwo.invokeAsync(suspendLocking);
Wait.pause(2000);
// let vmThree finish locking key
logger.info("[testSuspendLockingBehaves] unlock so vmThree can get key");
vmTwo.invoke(unlockKey);
ThreadUtils.join(vmThreeLocking, 10 * 1000);
// have vmOne get back in line for locking key
logger.info("[testSuspendLockingBehaves] start another lock request");
AsyncInvocation vmOneLockingAgain = vmOne.invokeAsync(lockKey);
Wait.pause(2000);
// let vmOne suspend locking
logger.info("[testSuspendLockingBehaves] let vmOne suspend locking");
Wait.pause(100);
assertThat(vmOneSuspending.isAlive()).isTrue();
vmThree.invoke(unlockKey);
ThreadUtils.join(vmOneSuspending, 10 * 1000);
// start suspending in vmThree
logger
.info("[testSuspendLockingBehaves] line up vmThree for suspending");
AsyncInvocation vmThreeSuspending = vmThree.invokeAsync(suspendLocking);
Wait.pause(2000);
// let vmTwo suspend locking
logger.info("[testSuspendLockingBehaves] let vmTwo suspend locking");
Wait.pause(100);
assertThat(vmTwoSuspending.isAlive()).isTrue();
vmOne.invoke(resumeLocking);
ThreadUtils.join(vmTwoSuspending, 10 * 1000);
// let vmOne get that lock
logger.info("[testSuspendLockingBehaves] let vmOne get that lock");
Wait.pause(100);
assertThat(vmOneLockingAgain.isAlive()).isTrue();
vmTwo.invoke(resumeLocking);
ThreadUtils.join(vmOneLockingAgain, 10 * 1000);
// let vmThree suspend locking
logger.info("[testSuspendLockingBehaves] let vmThree suspend locking");
Wait.pause(100);
assertThat(vmThreeSuspending.isAlive()).isTrue();
vmOne.invoke(unlockKey);
ThreadUtils.join(vmThreeSuspending, 10 * 1000);
// done
vmThree.invoke(resumeLocking);
}
private static BasicLockClient suspendClientSuspendLockingBehaves;
private static BasicLockClient lockClientSuspendLockingBehaves;
/**
* Test that exlusive locking prohibits locking activity
*/
@Test
public void testSuspendLockingBlocksUntilNoLocks() throws InterruptedException {
final String name = getUniqueName();
distributedCreateService(2, name, true);
final DistributedLockService service = getServiceNamed(name);
// Get lock from other VM. Since same thread needs to lock and unlock,
// invoke asynchronously, get lock, wait to be notified, then unlock.
VM vm1 = getVM(1);
vm1.invokeAsync(new SerializableRunnable("Lock & unlock in vm1") {
@Override
public void run() {
DistributedLockService service2 = getServiceNamed(name);
assertThat(service2.lock("lock", -1, -1)).isTrue();
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException ex) {
out.println("Unexpected InterruptedException");
fail("interrupted");
}
}
service2.unlock("lock");
}
});
// Let vm1's thread get the lock and go into wait()
sleep(100);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
setGot(service.suspendLocking(-1));
setDone(true);
service.resumeLocking();
}
});
setGot(false);
setDone(false);
thread.start();
// Let thread start, make sure it's blocked in suspendLocking
sleep(100);
assertThat(getGot() || getDone())
.withFailMessage("Before release, got: " + getGot() + ", done: " + getDone()).isFalse();
vm1.invoke(new SerializableRunnable("notify vm1 to unlock") {
@Override
public void run() {
synchronized (monitor) {
monitor.notify();
}
}
});
// Let thread finish, make sure it successfully suspended and is done
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
return getDone();
}
@Override
public String description() {
return null;
}
};
GeodeAwaitility.await().untilAsserted(ev);
if (!getGot() || !getDone()) {
dumpAllStacks();
}
assertThat(getGot() && getDone())
.withFailMessage("After release, got: " + getGot() + ", done: " + getDone()).isTrue();
}
@Test
public void testSuspendLockingInterruptiblyIsInterruptible() throws Exception {
started = false;
gotLock = false;
exception = null;
// Lock entire service in first thread
final String name = getUniqueName();
final DistributedLockService service = DistributedLockService.create(name, dlstSystem);
assertThat(service.suspendLocking(1000)).isTrue();
// Start second thread that tries to lock in second thread
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
try {
started = true;
gotLock = service.suspendLockingInterruptibly(-1);
} catch (InterruptedException ex) {
exception = ex;
}
}
});
thread2.start();
// Interrupt second thread
while (!started)
Thread.yield();
thread2.interrupt();
ThreadUtils.join(thread2, 20 * 1000);
// Expect it got InterruptedException and didn't lock the service
Thread.sleep(500);
assertThat(gotLock).isFalse();
assertThat(exception).isNotNull();
// Unlock entire service in first thread
service.resumeLocking();
Thread.sleep(500);
// Make sure it didn't get locked by second thread
assertThat(service.suspendLocking(1000)).isTrue();
DistributedLockService.destroy(name);
}
@Test
public void testSuspendLockingIsNotInterruptible() throws Exception {
started = false;
gotLock = false;
exception = null;
wasFlagSet = false;
// Lock entire service in first thread
final String name = getUniqueName();
final DistributedLockService service = DistributedLockService.create(name, dlstSystem);
assertThat(service.suspendLocking(1000)).isTrue();
// Start second thread that tries to lock in second thread
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
try {
started = true;
gotLock = service.suspendLocking(-1);
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable ex) {
exception = ex;
}
wasFlagSet = Thread.currentThread().isInterrupted();
}
});
thread2.start();
// Interrupt second thread
while (!started)
Thread.yield();
thread2.interrupt();
// Expect it didn't get an exception and didn't lock the service
Thread.sleep(500);
assertThat(gotLock).isFalse();
assertThat(exception).isNull();
// Unlock entire service in first thread
service.resumeLocking();
ThreadUtils.join(thread2, 20 * 1000);
// Now thread2 should have gotten the lock, not the exception, but the
// thread's flag should be set
logger.info("[testSuspendLockingIsNotInterruptible]" + " gotLock="
+ gotLock + " wasFlagSet=" + wasFlagSet + " exception=" + exception, exception);
assertThat(gotLock).isTrue();
assertThat(exception).isNull();
assertThat(wasFlagSet).isTrue();
}
/**
* Tests what happens when you attempt to lock a name on a lock service that has been destroyed.
*
* @author David Whitlock
*/
@Test
public void testLockDestroyedService() {
String serviceName = this.getUniqueName();
DistributedLockService service = DistributedLockService.create(serviceName, dlstSystem);
DistributedLockService.destroy(serviceName);
try {
boolean locked = service.lock("TEST", -1, -1);
fail("Lock of destroyed service returned: " + locked);
} catch (LockServiceDestroyedException ex) {
// pass...
}
}
@Test
public void testDepartedLastOwnerWithLease() {
final String serviceName = this.getUniqueName();
// Create service in this VM
DistributedLockService service = DistributedLockService.create(serviceName, dlstSystem);
assertThat(service.lock("key", -1, -1)).isTrue();
service.unlock("key");
// Create service in other VM
VM otherVm = VM.getVM(0);
otherVm.invoke(new SerializableRunnable() {
@Override
public void run() {
DistributedLockService service2 = DistributedLockService.create(serviceName, dlstSystem);
service2.lock("key", -1, 360000);
service2.unlock("key");
// Wait for asynchronous messaging to complete
try {
Thread.sleep(100);
} catch (InterruptedException ex) {
fail("interrupted");
}
disconnectFromDS();
}
});
// Now lock back in this VM
assertThat(service.lock("key", -1, -1)).isTrue();
}
@Test
public void testDepartedLastOwnerNoLease() {
final String serviceName = this.getUniqueName();
// Create service in this VM
DistributedLockService service = DistributedLockService.create(serviceName, dlstSystem);
assertThat(service.lock("key", -1, -1)).isTrue();
service.unlock("key");
// Create service in other VM
VM otherVm = VM.getVM(0);
otherVm.invoke(new SerializableRunnable() {
@Override
public void run() {
DistributedLockService service2 = DistributedLockService.create(serviceName, dlstSystem);
service2.lock("key", -1, -1);
service2.unlock("key");
// Wait for asynchronous messaging to complete
try {
Thread.sleep(100);
} catch (InterruptedException ex) {
fail("interrupted");
}
disconnectFromDS();
}
});
// Now lock back in this VM
assertThat(service.lock("key", -1, -1)).isTrue();
}
/**
* Tests for 32461 R3 StuckLocks can occur on locks with an expiration lease
* <p>
* VM-A locks/unlocks "lock", VM-B leases "lock" and disconnects, VM-C attempts to lock "lock" and
* old dlock throws StuckLockException. VM-C should now succeed in acquiring the lock.
*/
@Test
public void testBug32461() {
if (logger.isDebugEnabled()) {
logger.debug("[testBug32461] prepping");
}
final String serviceName = getUniqueName();
final Object objName = "32461";
final int VM_A = 0;
final int VM_B = 1;
final int VM_C = 2;
// VM-A locks/unlocks "lock"...
if (logger.isDebugEnabled()) {
logger.debug("[testBug32461] VM-A locks/unlocks '32461'");
}
VM.getVM(VM_A).invoke(new SerializableRunnable() {
@Override
public void run() {
remoteCreateService(serviceName);
final DistributedLockService service = DistributedLockService.getServiceNamed(serviceName);
assertThat(service.lock(objName, -1, Long.MAX_VALUE)).isTrue();
service.unlock(objName);
}
});
// VM-B leases "lock" and disconnects,
if (logger.isDebugEnabled()) {
logger.debug("[testBug32461] VM_B leases '32461' and disconnects");
}
VM.getVM(VM_B).invoke(new SerializableRunnable() {
@Override
public void run() {
remoteCreateService(serviceName);
final DistributedLockService service = DistributedLockService.getServiceNamed(serviceName);
assertThat(service.lock(objName, -1, Long.MAX_VALUE)).isTrue();
DistributedLockService.destroy(serviceName);
disconnectFromDS();
}
});
if (logger.isDebugEnabled()) {
logger.debug("[testBug32461] VM_C attempts to lock '32461'");
}
VM.getVM(VM_C).invoke(new SerializableRunnable() {
@Override
public void run() {
remoteCreateService(serviceName);
final DistributedLockService service = DistributedLockService.getServiceNamed(serviceName);
assertThat(service.lock(objName, -1, -1)).isTrue();
service.unlock(objName);
}
});
}
@Test
public void testNoStuckLock() {
final String serviceName = this.getUniqueName();
final Object keyWithLease = "key-with-lease";
final Object keyNoLease = "key-no-lease";
// Create service in this VM
DistributedLockService service = DistributedLockService.create(serviceName, dlstSystem);
assertThat(service.lock(keyWithLease, -1, -1)).isTrue();
service.unlock(keyWithLease);
assertThat(service.lock(keyNoLease, -1, -1)).isTrue();
service.unlock(keyNoLease);
// Create service in other VM
VM otherVm = VM.getVM(0);
otherVm.invoke(new SerializableRunnable() {
@Override
public void run() {
DistributedLockService service2 = DistributedLockService.create(serviceName, dlstSystem);
service2.lock(keyWithLease, -1, 360000);
service2.lock(keyNoLease, -1, -1);
disconnectFromDS();
}
});
// Now lock back in this VM... no stuck locks anymore
assertThat(service.lock(keyWithLease, -1, -1)).isTrue();
service.unlock(keyWithLease);
assertThat(service.lock(keyNoLease, -1, -1)).isTrue();
service.unlock(keyNoLease);
}
volatile boolean startedThread1_testReleaseOrphanedGrant;
volatile boolean releaseThread1_testReleaseOrphanedGrant;
volatile boolean startedThread2_testReleaseOrphanedGrant;
volatile boolean gotLockThread2_testReleaseOrphanedGrant;
/**
* Client requests lock and then interrupts lock request before processing the grant reply. This
* causes the Client to send a release msg to the grantor.
*/
@Test
public void testReleaseOrphanedGrant_Local() throws Exception {
DLockRequestProcessor.setDebugReleaseOrphanedGrant(true);
DLockRequestProcessor.setWaitToProcessDLockResponse(false);
try {
startedThread2_testReleaseOrphanedGrant = false;
gotLockThread2_testReleaseOrphanedGrant = false;
releaseThread1_testReleaseOrphanedGrant = false;
logger.info("[testReleaseOrphanedGrant_Local] create lock service");
final String serviceName = getUniqueName();
final DistributedLockService service = DistributedLockService.create(serviceName, dlstSystem);
// thread to get lock and wait and then unlock
final Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
logger.info("[testReleaseOrphanedGrant_Local] get the lock");
assertThat(service.lock("obj", -1, -1)).isTrue();
DLockRequestProcessor.setWaitToProcessDLockResponse(true);
startedThread1_testReleaseOrphanedGrant = true;
synchronized (Thread.currentThread()) {
while (!releaseThread1_testReleaseOrphanedGrant) {
try {
Thread.currentThread().wait();
} catch (InterruptedException ignore) {
fail("interrupted");
}
}
}
logger.info("[testReleaseOrphanedGrant_Local] unlock the lock");
service.unlock("obj");
}
});
thread1.start();
while (!startedThread1_testReleaseOrphanedGrant) {
Thread.yield();
}
// thread to interrupt lockInterruptibly call to cause zombie grant
final Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
try {
logger
.info("[testReleaseOrphanedGrant_Local] call lockInterruptibly");
startedThread2_testReleaseOrphanedGrant = true;
assertThat(service.lockInterruptibly("obj", -1, -1)).isFalse();
} catch (InterruptedException expected) {
Thread.currentThread().interrupt();
}
}
});
thread2.start();
while (!startedThread2_testReleaseOrphanedGrant) {
Thread.yield();
}
// release first thread to unlock
logger.info("[testReleaseOrphanedGrant_Local] release 1st thread");
Thread.sleep(500);
synchronized (thread1) {
releaseThread1_testReleaseOrphanedGrant = true;
thread1.notifyAll();
}
Thread.sleep(500);
// while first thread is stuck on waitToProcessDLockResponse,
// interrupt 2nd thread
logger.info("[testReleaseOrphanedGrant_Local] interrupt 2nd thread");
thread2.interrupt();
ThreadUtils.join(thread2, 20 * 1000);
// release waitToProcessDLockResponse
logger.info("[testReleaseOrphanedGrant_Local] process lock response");
Thread.sleep(500);
DLockRequestProcessor.setWaitToProcessDLockResponse(false);
// relock obj to make sure zombie release worked
logger.info("[testReleaseOrphanedGrant_Local] verify lock not held");
assertThat(service.lock("obj", 1000, -1)).isTrue();
} finally {
DLockRequestProcessor.setDebugReleaseOrphanedGrant(false);
DLockRequestProcessor.setWaitToProcessDLockResponse(false);
}
}
private static volatile Thread threadVM1_testReleaseOrphanedGrant_Remote;
private static volatile Thread threadVM2_testReleaseOrphanedGrant_Remote;
private static volatile boolean startedThreadVM1_testReleaseOrphanedGrant_Remote;
private static volatile boolean releaseThreadVM1_testReleaseOrphanedGrant_Remote;
private static volatile boolean unlockedThreadVM1_testReleaseOrphanedGrant_Remote;
private static volatile boolean startedThreadVM2_testReleaseOrphanedGrant_Remote;
@Test
public void testReleaseOrphanedGrant_Remote() throws Exception {
doTestReleaseOrphanedGrant_Remote(false);
}
@Test
public void testReleaseOrphanedGrant_RemoteWithDestroy() throws Exception {
doTestReleaseOrphanedGrant_Remote(true);
}
private void doTestReleaseOrphanedGrant_Remote(final boolean destroyLockService)
throws InterruptedException {
final VM vm1 = VM.getVM(0);
final VM vm2 = VM.getVM(1);
try {
logger.info("[testReleaseOrphanedGrant_Remote] create lock service");
final String serviceName = getUniqueName();
final DistributedLockService service = DistributedLockService.create(serviceName, dlstSystem);
// lock and unlock to make sure this vm is grantor
assertThat(service.lock("obj", -1, -1)).isTrue();
service.unlock("obj");
// thread to get lock and wait and then unlock
vm1.invokeAsync(new SerializableRunnable() {
@Override
public void run() {
logger.info("[testReleaseOrphanedGrant_Remote] get the lock");
threadVM1_testReleaseOrphanedGrant_Remote = Thread.currentThread();
connectDistributedSystem();
DistributedLockService service_vm1 =
DistributedLockService.create(serviceName, getSystem());
assertThat(service_vm1.lock("obj", -1, -1)).isTrue();
synchronized (threadVM1_testReleaseOrphanedGrant_Remote) {
while (!releaseThreadVM1_testReleaseOrphanedGrant_Remote) {
try {
startedThreadVM1_testReleaseOrphanedGrant_Remote = true;
Thread.currentThread().wait();
} catch (InterruptedException ignore) {
fail("interrupted");
}
}
}
logger.info("[testReleaseOrphanedGrant_Remote] unlock the lock");
service_vm1.unlock("obj");
unlockedThreadVM1_testReleaseOrphanedGrant_Remote = true;
}
});
vm1.invoke(new SerializableRunnable() {
@Override
public void run() {
while (!startedThreadVM1_testReleaseOrphanedGrant_Remote) {
Thread.yield();
}
}
});
Thread.sleep(500);
// thread to interrupt lockInterruptibly call to cause zombie grant
vm2.invokeAsync(new SerializableRunnable() {
@Override
public void run() {
logger
.info("[testReleaseOrphanedGrant_Remote] call lockInterruptibly");
threadVM2_testReleaseOrphanedGrant_Remote = Thread.currentThread();
DistributedLockService service_vm2 =
DistributedLockService.create(serviceName, getSystem());
startedThreadVM2_testReleaseOrphanedGrant_Remote = true;
try {
DLockRequestProcessor.setDebugReleaseOrphanedGrant(true);
DLockRequestProcessor.setWaitToProcessDLockResponse(true);
assertThat(service_vm2.lockInterruptibly("obj", -1, -1)).isFalse();
} catch (InterruptedException expected) {
Thread.currentThread().interrupt();
}
}
});
vm2.invoke(new SerializableRunnable() {
@Override
public void run() {
while (!startedThreadVM2_testReleaseOrphanedGrant_Remote) {
Thread.yield();
}
}
});
Thread.sleep(500);
// release first thread to unlock
vm1.invoke(new SerializableRunnable() {
@Override
public void run() {
logger
.info("[testReleaseOrphanedGrant_Remote] release 1st thread");
synchronized (threadVM1_testReleaseOrphanedGrant_Remote) {
releaseThreadVM1_testReleaseOrphanedGrant_Remote = true;
threadVM1_testReleaseOrphanedGrant_Remote.notifyAll();
}
}
});
Thread.sleep(500); // lock is being released, grantor will grant lock to vm2
// while first thread is stuck on waitToProcessDLockResponse,
// interrupt 2nd thread
vm2.invoke(new SerializableRunnable() {
@Override
public void run() {
logger
.info("[testReleaseOrphanedGrant_Remote] interrupt 2nd thread");
threadVM2_testReleaseOrphanedGrant_Remote.interrupt();
ThreadUtils.join(threadVM2_testReleaseOrphanedGrant_Remote, 5 * 60 * 1000);
if (destroyLockService) {
logger
.info("[testReleaseOrphanedGrant_Remote] destroy lock service");
DistributedLockService.destroy(serviceName);
assertThat(DistributedLockService.getServiceNamed(serviceName)).isNull();
}
}
});
Thread.sleep(500); // grant is blocked while reply processor is being destroyed
// release waitToProcessDLockResponse
vm2.invoke(new SerializableRunnable() {
@Override
public void run() {
logger
.info("[testReleaseOrphanedGrant_Remote] process lock response");
DLockRequestProcessor.setWaitToProcessDLockResponse(false);
}
});
Thread.sleep(500); // process grant and send zombie release to grantor
// relock obj to make sure zombie release worked
logger.info("[testReleaseOrphanedGrant_Remote] verify lock not held");
assertThat(service.lock("obj", 1000, -1)).isTrue();
} finally {
vm2.invoke(new SerializableRunnable() {
@Override
public void run() {
logger
.info("[testReleaseOrphanedGrant_Remote] clean up DebugReleaseOrphanedGrant");
DLockRequestProcessor.setDebugReleaseOrphanedGrant(false);
DLockRequestProcessor.setWaitToProcessDLockResponse(false);
}
});
}
}
@Test
public void testDestroyLockServiceAfterGrantResponse() {
VM vm0 = VM.getVM(0);
final String serviceName = getUniqueName();
vm0.invoke(() -> createLockGrantor(serviceName));
DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
@Override
public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
if (message instanceof DLockResponseMessage) {
DistributedLockService.destroy(serviceName);
}
}
});
connectDistributedSystem();
final DistributedLockService service = DistributedLockService.create(serviceName, dlstSystem);
try {
service.lock("obj", -1, -1);
fail("The lock service should have been destroyed");
} catch (LockServiceDestroyedException expected) {
// Do nothing
}
vm0.invoke(new SerializableRunnable("check to make sure the lock is not orphaned") {
@Override
public void run() {
final DistributedLockService service = DistributedLockService.getServiceNamed(serviceName);
// lock and unlock to make sure this vm is grantor
assertThat(service.lock("obj", -1, -1)).isTrue();
service.unlock("obj");
}
});
}
@Test
public void testDestroyLockServiceBeforeGrantRequest() {
VM vm0 = VM.getVM(0);
final String serviceName = getUniqueName();
vm0.invoke(() -> createLockGrantor(serviceName));
DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
@Override
public void beforeSendMessage(ClusterDistributionManager dm, DistributionMessage message) {
if (message instanceof DLockRequestMessage) {
DistributedLockService.destroy(serviceName);
}
}
});
connectDistributedSystem();
final DistributedLockService service = DistributedLockService.create(serviceName, dlstSystem);
try {
service.lock("obj", -1, -1);
fail("The lock service should have been destroyed");
} catch (LockServiceDestroyedException expected) {
// Do nothing
}
vm0.invoke(new SerializableRunnable("check to make sure the lock is not orphaned") {
@Override
public void run() {
final DistributedLockService service = DistributedLockService.getServiceNamed(serviceName);
// lock and unlock to make sure this vm is grantor
assertThat(service.lock("obj", -1, -1)).isTrue();
service.unlock("obj");
}
});
}
private static void createLockGrantor(String serviceName) {
connectDistributedSystem();
final DistributedLockService service =
DistributedLockService.create(serviceName, dlstSystem);
// lock and unlock to make sure this vm is grantor
// implementation detail: as long as this VM gets the first lock, it will be the grantor
assertThat(service.lock("obj", -1, -1)).isTrue();
service.unlock("obj");
}
////////// Private test methods
protected synchronized boolean getDone() {
return done;
}
protected synchronized void setDone(boolean done) {
this.done = done;
}
private synchronized boolean getGot() {
return got;
}
protected synchronized void setGot(boolean got) {
this.got = got;
}
protected static Boolean lock(String serviceName, Object name) {
DistributedLockService service = DistributedLockService.getServiceNamed(serviceName);
boolean locked = service.lock(name, 1000, -1);
return locked;
}
protected static Boolean tryLock(String serviceName, Object name, Long wait) {
DLockService service = DLockService.getInternalServiceNamed(serviceName);
boolean locked = service.lock(name, wait, -1, true);
return locked;
}
protected static Boolean unlock(String serviceName, Object name) {
DistributedLockService service = DistributedLockService.getServiceNamed(serviceName);
try {
service.unlock(name);
return Boolean.TRUE;
} catch (LockNotHeldException e) {
return Boolean.FALSE;
} catch (Exception e) {
e.printStackTrace();
return Boolean.FALSE;
}
}
private static Boolean tryToLock(String serviceName) {
DistributedLockService service = DistributedLockService.getServiceNamed(serviceName);
boolean locked = service.lock("obj", 1000, -1);
if (locked) {
service.unlock("obj");
}
return locked;
}
private void doOneGetsAndOthersTimeOut(int numVMs, int numThreadsPerVM) throws Exception {
final String serviceName = getUniqueName() + "-" + numVMs + "-" + numThreadsPerVM;
final String objectName = "obj";
System.out.println("Starting testtt " + serviceName);
distributedCreateService(numVMs, serviceName, true);
hits = 0;
completes = 0;
blackboard.initCount();
blackboard.setIsLocked(false);
// tell them all to request a lock and increment
long timeout = 1000;
long holdTime = (timeout * 5);
final Host host = Host.getHost(0);
for (int vm = 0; vm < numVMs; vm++) {
final int finalvm = vm;
for (int thread = 0; thread < numThreadsPerVM; thread++) {
final int finalthread = thread;
(new Thread(() -> {
Boolean result = VM.getVM(finalvm)
.invoke(() -> DistributedLockServiceDUnitTest.getLockAndIncrement(
serviceName, objectName, timeout, holdTime));
if (result) {
incHits();
}
incCompletes();
}, "doOneGetsAndOthersTimeOut-" + thread)).start();
}
}
// wait for timeout
// wait for completion or timeout
long start = System.currentTimeMillis();
while ((completes < numVMs * numThreadsPerVM)
&& (System.currentTimeMillis() - start < holdTime * 10)) {
Thread.sleep(200);
}
// assert that only one got ownership
if (hits != 1) {
ThreadUtils.dumpAllStacks();
}
assertThat(hits).isEqualTo(1)
.withFailMessage("number of VMs that got ownership is wrong");
// assert that all the others timed out
assertThat(completes).isEqualTo(numVMs * numThreadsPerVM)
.withFailMessage("number of threads that completed is wrong");
// Check final value of entry
long count = blackboard.getCount();
assertThat(count).isEqualTo(1).withFailMessage("Final entry value wrong");
System.out.println("Done testtt " + serviceName);
}
// 2, 2... expect 4, but get 3
private void doOneGetsThenOtherGets(int numVMs, int numThreadsPerVM) throws Exception {
final String serviceName = getUniqueName() + "-" + numVMs + "-" + numThreadsPerVM;
final String objectName = "obj";
System.out.println("Starting testtt " + serviceName);
distributedCreateService(numVMs, serviceName, true);
hits = 0;
completes = 0;
blackboard.initCount();
blackboard.setIsLocked(false);
// tell all VMs to lock, long timeout, short hold time
// each one gets lock, increments and releases
final long timeout = Math.max(1000 * numVMs * numThreadsPerVM, 10 * 1000); // at least 10
// seconds
final Host host = Host.getHost(0);
for (int vm = 0; vm < numVMs; vm++) {
final int finalvm = vm;
for (int thread = 0; thread < numThreadsPerVM; thread++) {
final int finalthread = thread;
(new Thread(() -> {
System.out.println("VM " + finalvm + ", thread " + finalthread + " in " + serviceName
+ " about to invoke");
Boolean result = null;
try {
result = VM.getVM(finalvm)
.invoke(() -> DistributedLockServiceDUnitTest.getLockAndIncrement(
serviceName, objectName, timeout, 0L));
} catch (Exception e) {
throw new RuntimeException(e);
}
System.out.println("VM " + finalvm + ", thread " + finalthread + " in " + serviceName
+ " got result " + result);
if (result) {
incHits();
}
incCompletes();
})).start();
}
}
// wait for completion or timeout
long start = System.currentTimeMillis();
while (completes < numVMs * numThreadsPerVM) {
if (!(System.currentTimeMillis() - start < timeout * 2)) {
System.out.println("Test serviceName timed out");
break;
}
Thread.sleep(200);
}
// assert that all completed
assertThat(completes).isEqualTo(numVMs * numThreadsPerVM)
.withFailMessage("number of threads that completed is wrong");
// -------------------------------------------------------
// assert that all were able to lock
if (hits != numVMs * numThreadsPerVM) {
ThreadUtils.dumpAllStacks();
}
assertThat(hits).isEqualTo(numVMs * numThreadsPerVM)
.withFailMessage("number of VMs that got ownership is wrong");
// Check final value of entry
long count = blackboard.getCount();
assertThat(count).isEqualTo(numVMs * numThreadsPerVM)
.withFailMessage("Blackboard.getCount() wrong");
System.out.println("Done testtt " + serviceName);
}
@Test
public void testTokenCleanup() {
final String dlsName = getUniqueName();
final VM vmGrantor = VM.getVM(0);
final VM vm1 = VM.getVM(1);
// final VM vm2 =VM.getVM(2);
final String key1 = "key1";
// vmGrantor creates grantor
vmGrantor.invoke(new SerializableRunnable() {
@Override
public void run() {
logger.info("[testTokenCleanup] vmGrantor creates grantor");
connectDistributedSystem();
DLockService dls = (DLockService) DistributedLockService.create(dlsName, getSystem());
assertThat(dls.lock(key1, -1, -1)).isTrue();
assertThat(dls.isLockGrantor()).isTrue();
assertThat(dls.getToken(key1)).isNotNull();
dls.unlock(key1);
assertThat(dls.getToken(key1)).isNotNull();
// token should be removed when freeResources is called
dls.freeResources(key1);
// assertThat(dls.getToken(key1)).isNull();
DLockToken token = dls.getToken(key1);
assertThat(token).withFailMessage("Failed with bug 38180: " + token).isNull();
// make sure there are NO tokens at all
Collection tokens = dls.getTokens();
assertThat(tokens.size()).isEqualTo(0)
.withFailMessage("Failed with bug 38180: tokens=" + tokens);
}
});
// vm1 locks and frees key1
vm1.invoke(new SerializableRunnable() {
@Override
public void run() {
logger.info("[testTokenCleanup] vm1 locks key1");
connectDistributedSystem();
DLockService dls = (DLockService) DistributedLockService.create(dlsName, getSystem());
assertThat(dls.lock(key1, -1, -1)).isTrue();
assertThat(dls.isLockGrantor()).isFalse();
assertThat(dls.getToken(key1)).isNotNull();
dls.unlock(key1);
assertThat(dls.getToken(key1)).isNotNull();
dls.freeResources(key1);
// assertThat(dls.getToken(key1)).isNull();
DLockToken token = dls.getToken(key1);
assertThat(token).withFailMessage("Failed with bug 38180: " + token).isNull();
// make sure there are NO tokens at all
Collection tokens = dls.getTokens();
assertThat(tokens.size()).isEqualTo(0)
.withFailMessage("Failed with bug 38180: tokens=" + tokens);
}
});
// vm1 tests recursion
vm1.invoke(new SerializableRunnable() {
@Override
public void run() {
logger.info("[testTokenCleanup] vm1 tests recursion");
connectDistributedSystem();
DLockService dls = (DLockService) DistributedLockService.getServiceNamed(dlsName);
assertThat(dls.lock(key1, -1, -1)).isTrue(); // 1
assertThat(dls.getToken(key1).getUsageCount()).isEqualTo(1);
assertThat(dls.lock(key1, -1, -1)).isTrue(); // 2
assertThat(dls.getToken(key1).getUsageCount()).isEqualTo(2);
assertThat(dls.lock(key1, -1, -1)).isTrue(); // 3
assertThat(dls.getToken(key1).getUsageCount()).isEqualTo(3);
DLockToken token0 = dls.getToken(key1);
assertThat(token0).isNotNull();
Collection tokens = dls.getTokens();
assertThat(tokens.contains(token0)).isTrue();
assertThat(tokens.size()).isEqualTo(1);
dls.unlock(key1); // 1
assertThat(dls.getToken(key1).getUsageCount()).isEqualTo(2);
dls.freeResources(key1);
DLockToken token1 = dls.getToken(key1);
assertThat(token1).isNotNull();
assertThat(token1).isEqualTo(token0);
tokens = dls.getTokens();
assertThat(tokens.contains(token1)).isTrue();
assertThat(tokens.size()).isEqualTo(1);
dls.unlock(key1); // 2
assertThat(dls.getToken(key1).getUsageCount()).isEqualTo(1);
dls.freeResources(key1);
assertThat(dls.getToken(key1)).isNotNull();
DLockToken token2 = dls.getToken(key1);
assertThat(token2).isNotNull();
assertThat(token2).isEqualTo(token0);
tokens = dls.getTokens();
assertThat(tokens.contains(token2)).isTrue();
assertThat(tokens.size()).isEqualTo(1);
dls.unlock(key1); // 3
assertThat(dls.getToken(key1).getUsageCount()).isEqualTo(0);
dls.freeResources(key1);
DLockToken token3 = dls.getToken(key1);
assertThat(token3).withFailMessage("Failed with bug 38180: " + token3).isNull();
// make sure there are NO tokens at all
tokens = dls.getTokens();
assertThat(tokens.size()).isEqualTo(0)
.withFailMessage("Failed with bug 38180: tokens=" + tokens);
}
});
}
@Test
public void testGrantTokenCleanup() {
final String dlsName = getUniqueName();
final VM vmGrantor = VM.getVM(0);
final VM vm1 = VM.getVM(1);
final String key1 = "key1";
// vmGrantor creates grantor
vmGrantor.invoke(new SerializableRunnable() {
@Override
public void run() {
logger.info("[testGrantTokenCleanup] vmGrantor creates grantor");
connectDistributedSystem();
DistributedLockService dls = DLockService.create(dlsName, getSystem(), true, true, true);
assertThat(dls.lock(key1, -1, -1)).isTrue();
assertThat(dls.isLockGrantor()).isTrue();
DLockGrantor grantor = ((DLockService) dls).getGrantor();
assertThat(grantor).isNotNull();
DLockGrantor.DLockGrantToken grantToken = grantor.getGrantToken(key1);
assertThat(grantToken).isNotNull();
logger.info("[testGrantTokenCleanup] vmGrantor unlocks key1");
dls.unlock(key1);
assertThat(grantor.getGrantToken(key1)).isNull();
}
});
// vm1 locks and frees key1
vm1.invoke(new SerializableRunnable() {
@Override
public void run() {
logger.info("[testTokenCleanup] vm1 locks key1");
connectDistributedSystem();
DLockService dls =
(DLockService) DLockService.create(dlsName, getSystem(), true, true, false);
assertThat(dls.lock(key1, -1, -1)).isTrue();
logger.info("[testTokenCleanup] vm1 frees key1");
dls.unlock(key1);
// Without automateFreeResources, token for key1 still exists until freeResources is called
assertThat(dls.getToken(key1)).isNotNull();
dls.freeResources(key1);
// make sure token for key1 is gone
DLockToken token = dls.getToken(key1);
assertThat(token).withFailMessage("token should have been cleaned up").isNull();
// make sure there are NO tokens at all
Collection tokens = dls.getTokens();
assertThat(tokens.size()).isEqualTo(0)
.withFailMessage("There should be no tokens");
}
});
// vmGrantor frees key1
vmGrantor.invoke(new SerializableRunnable() {
@Override
public void run() {
logger.info("[testTokenCleanup] vmGrantor frees key1");
DLockService dls = (DLockService) DistributedLockService.getServiceNamed(dlsName);
// Because automateFreeResources is true, DLockToken and DLockGrantToken should have been
// removed when vm1 unlocked key1
// make sure token for key1 is gone
DLockToken token = dls.getToken(key1);
assertThat(token).withFailMessage("token should have been cleaned up").isNull();
// make sure there are NO tokens at all
Collection tokens = dls.getTokens();
assertThat(tokens.size()).isEqualTo(0)
.withFailMessage("There should be no tokens");
// make sure there are NO grant tokens at all
DLockGrantor grantor = dls.getGrantor();
Collection grantTokens = grantor.getGrantTokens();
assertThat(grantTokens.size()).isEqualTo(0)
.withFailMessage("There should be no tokens");
}
});
}
private static final AtomicBoolean testLockQuery_whileVM1Locks = new AtomicBoolean();
@Test
public void testLockQuery() {
final String dlsName = getUniqueName();
final VM vmGrantor = VM.getVM(0);
final VM vm1 = VM.getVM(1);
final VM vm2 = VM.getVM(2);
final String key1 = "key1";
// vmGrantor creates grantor
vmGrantor.invoke(new SerializableRunnable() {
@Override
public void run() {
logger.info("[testLockQuery] vmGrantor creates grantor");
connectDistributedSystem();
DLockService dls = (DLockService) DistributedLockService.create(dlsName, getSystem());
assertThat(dls.lock(key1, -1, -1)).isTrue();
assertThat(dls.isLockGrantor()).isTrue();
dls.unlock(key1);
dls.freeResources(key1);
}
});
AsyncInvocation whileVM1Locks = null;
try {
// vm1 locks key1
whileVM1Locks = vm1.invokeAsync(new SerializableRunnable() {
@Override
public void run() {
logger.info("[testLockQuery] vm1 locks key1");
connectDistributedSystem();
DLockService dls = (DLockService) DistributedLockService.create(dlsName, getSystem());
assertThat(dls.lock(key1, -1, -1)).isTrue();
assertThat(dls.isLockGrantor()).isFalse();
try {
synchronized (testLockQuery_whileVM1Locks) {
testLockQuery_whileVM1Locks.set(true);
testLockQuery_whileVM1Locks.notifyAll();
long maxWait = 10000;
StopWatch timer = new StopWatch(true);
while (testLockQuery_whileVM1Locks.get()) { // while true
long timeLeft = maxWait - timer.elapsedTimeMillis();
if (timeLeft > 0) {
testLockQuery_whileVM1Locks.wait(timeLeft);
} else {
fail("Test attempted to wait too long");
}
}
}
} catch (InterruptedException e) {
org.apache.geode.test.dunit.Assert.fail(e.getMessage(), e);
}
logger.info("[testLockQuery] vm1 unlocks key1");
dls.unlock(key1);
dls.freeResources(key1);
}
});
// wait for vm1 to set testLockQuery_whileVM1Locks
// get DistributedMember for vm1
final DistributedMember vm1Member = vm1.invoke(() -> {
logger.info("[testLockQuery] vm1 waits for locking thread");
synchronized (testLockQuery_whileVM1Locks) {
long maxWait = 10000;
StopWatch timer = new StopWatch(true);
while (!testLockQuery_whileVM1Locks.get()) { // while false
long timeLeft = maxWait - timer.elapsedTimeMillis();
if (timeLeft > 0) {
testLockQuery_whileVM1Locks.wait(timeLeft);
} else {
fail("Test attempted to wait too long");
}
}
}
return getSystem().getDistributedMember();
});
assertThat(vm1Member).isNotNull();
// vmGrantor tests positive local dlock query
vmGrantor.invoke(new SerializableRunnable() {
@Override
public void run() {
logger.info("[testLockQuery] vmGrantor tests local query");
DLockService dls = (DLockService) DistributedLockService.getServiceNamed(dlsName);
DLockRemoteToken result = dls.queryLock(key1);
assertThat(result).isNotNull();
assertThat(result.getName()).isEqualTo(key1);
assertThat(result.getLeaseId() != -1).isTrue();
assertThat(result.getLeaseExpireTime()).isEqualTo(MAX_VALUE);
RemoteThread lesseeThread = result.getLesseeThread();
assertThat(lesseeThread).isNotNull();
assertThat(lesseeThread.getDistributedMember()).isEqualTo(vm1Member);
assertThat(result.getLessee()).isEqualTo(vm1Member);
// nothing to test for on threadId unless we serialize info from vm1
}
});
// vm2 tests positive remote dlock query
vm2.invoke(new SerializableRunnable() {
@Override
public void run() {
logger.info("[testLockQuery] vm2 tests remote query");
connectDistributedSystem();
DLockService dls = (DLockService) DistributedLockService.create(dlsName, getSystem());
DLockRemoteToken result = dls.queryLock(key1);
assertThat(result).isNotNull();
assertThat(result.getName()).isEqualTo(key1);
assertThat(result.getLeaseId() != -1).isTrue();
assertThat(result.getLeaseExpireTime()).isEqualTo(MAX_VALUE);
RemoteThread lesseeThread = result.getLesseeThread();
assertThat(lesseeThread).isNotNull();
assertThat(lesseeThread.getDistributedMember()).isEqualTo(vm1Member);
assertThat(result.getLessee()).isEqualTo(vm1Member);
// nothing to test for on threadId unless we serialize info from vm1
}
});
} finally { // guarantee that testLockQuery_whileVM1Locks is notfied!
// vm1 sets and notifies testLockQuery_whileVM1Locks to release lock
vm1.invoke(new SerializableRunnable() {
@Override
public void run() {
logger.info("[testLockQuery] vm1 notifies/releases key1");
synchronized (testLockQuery_whileVM1Locks) {
testLockQuery_whileVM1Locks.set(false);
testLockQuery_whileVM1Locks.notifyAll();
}
}
});
ThreadUtils.join(whileVM1Locks, 10 * 1000);
if (whileVM1Locks.exceptionOccurred()) {
org.apache.geode.test.dunit.Assert.fail("Test failed", whileVM1Locks.getException());
}
}
// vmGrantor tests negative local dlock query
vmGrantor.invoke(new SerializableRunnable() {
@Override
public void run() {
logger.info("[testLockQuery] vmGrantor tests negative query");
DLockService dls = (DLockService) DistributedLockService.getServiceNamed(dlsName);
DLockRemoteToken result = dls.queryLock(key1);
assertThat(result).isNotNull();
assertThat(result.getName()).isEqualTo(key1);
assertThat(result.getLeaseId()).isEqualTo(-1);
assertThat(result.getLeaseExpireTime()).isEqualTo(0);
assertThat(result.getLesseeThread()).isNull();
assertThat(result.getLessee()).isNull();
}
});
// vm2 tests negative remote dlock query
vm2.invoke(new SerializableRunnable() {
@Override
public void run() {
logger.info("[testLockQuery] vm2 tests negative query");
DLockService dls = (DLockService) DistributedLockService.getServiceNamed(dlsName);
DLockRemoteToken result = dls.queryLock(key1);
assertThat(result).isNotNull();
assertThat(result.getName()).isEqualTo(key1);
assertThat(result.getLeaseId()).isEqualTo(-1);
assertThat(result.getLeaseExpireTime()).isEqualTo(0);
assertThat(result.getLesseeThread()).isNull();
assertThat(result.getLessee()).isNull();
}
});
}
////////// Support methods
private List<VM> distributedCreateService(int numVMs, String serviceName, boolean useLocalVM) {
// create an entry - use scope DIST_ACK, not GLOBAL, since we're testing
// that explicit use of the ownership api provides the synchronization
final List<VM> vms = new ArrayList<>();
if (useLocalVM) {
vms.add(VM.getVM(VM.getCurrentVMNum()));
remoteCreateService(serviceName);
}
vms.addAll(forNumVMsInvoke(numVMs, () -> remoteCreateService(serviceName)));
// remoteCreateService(serviceName);
return vms;
}
/**
* Creates a new DistributedLockService in a remote VM.
*
* @param name The name of the newly-created DistributedLockService. It is recommended that the
* name of the Region be the {@link #getUniqueName()} of the test, or at least derive from
* it.
*/
protected static void remoteCreateService(String name) {
DistributedLockService newService = DistributedLockService.create(name, dlstSystem);
System.out.println("Created " + newService);
}
private static Boolean getLockAndIncrement(String serviceName, Object objectName, long timeout,
long holdTime) throws Exception {
System.out.println("[getLockAndIncrement] In getLockAndIncrement");
DistributedLockService service = DistributedLockService.getServiceNamed(serviceName);
boolean got = service.lock(objectName, timeout, -1);
System.out.println("[getLockAndIncrement] In getLockAndIncrement - got is " + got);
if (got) {
// Make sure we don't think anyone else is holding the lock
if (blackboard.getIsLocked()) {
String msg = "obtained lock on " + serviceName + "/" + objectName
+ " but blackboard was locked, grantor=" + ((DLockService) service).getLockGrantorId()
+ ", isGrantor=" + service.isLockGrantor();
System.out.println("[getLockAndIncrement] In getLockAndIncrement: " + msg);
fail(msg);
}
blackboard.setIsLocked(true);
long count = blackboard.getCount();
System.out
.println("[getLockAndIncrement] In getLockAndIncrement - count is " + count + " for "
+ serviceName + "/" + objectName);
Thread.sleep(holdTime);
blackboard.incCount();
blackboard.setIsLocked(false);
System.out
.println("[getLockAndIncrement] In getLockAndIncrement: " + "cleared blackboard lock for "
+ serviceName + "/" + objectName);
service.unlock(objectName);
}
System.out.println("[getLockAndIncrement] Returning from getLockAndIncrement");
return got;
}
private synchronized void incHits() {
hits = hits + 1;
}
private synchronized void incCompletes() {
completes = completes + 1;
}
/**
* Assumes there is only one host, and invokes the given method in the first numVMs VMs that host
* knows about.
*/
private List<VM> forNumVMsInvoke(int numVMs, final SerializableRunnableIF runnable) {
List<VM> vms = new ArrayList<>();
for (int i = 0; i < numVMs; i++) {
VM thisVm = VM.getVM(i);
thisVm.invoke(runnable);
vms.add(thisVm);
}
return vms;
}
public static class BasicLockClient implements Runnable {
private static Logger logger = LogService.getLogger();
private static final Integer LOCK = 1;
private static final Integer UNLOCK = 2;
private static final Integer SUSPEND = 3;
private static final Integer RESUME = 4;
private static final Integer STOP = 5;
private final Object sync = new Object();
private final Thread thread;
private final String dlsName;
private final String key;
// ordered queue of requests
private final LinkedList requests = new LinkedList();
// map of requests to operations
private final Map operationsMap = new HashMap();
// map of requests to throwables
private final Map throwables = new HashMap();
private final Set completedRequests = new HashSet();
private int latestRequest = 0;
private boolean stayinAlive = true;
public BasicLockClient(String dlsName, String key) {
this.dlsName = dlsName;
this.key = key;
this.thread = new Thread(this);
this.thread.start();
}
@Override
public void run() {
logger.info("BasicLockClient running");
while (this.stayinAlive) {
synchronized (this.sync) {
if (this.requests.size() > 0) {
Integer requestId = (Integer) this.requests.removeFirst();
Integer operationId = (Integer) this.operationsMap.get(requestId);
try {
switch (operationId) {
case 1:
logger.info("BasicLockClient lock");
assertThat(DistributedLockService.getServiceNamed(dlsName).lock(key, -1, -1))
.isTrue();
break;
case 2:
logger.info("BasicLockClient unlock");
DistributedLockService.getServiceNamed(dlsName).unlock(key);
break;
case 3:
logger.info("BasicLockClient suspendLocking");
assertThat(DistributedLockService.getServiceNamed(dlsName).suspendLocking(-1))
.isTrue();
break;
case 4:
logger.info("BasicLockClient resumeLocking");
DistributedLockService.getServiceNamed(dlsName).resumeLocking();
break;
case 5:
logger.info("BasicLockClient stopping");
this.stayinAlive = false;
break;
} // switch
} // try
catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable t) {
this.throwables.put(requestId, t);
} finally {
this.completedRequests.add(requestId);
this.sync.notify();
}
}
try {
this.sync.wait();
} catch (InterruptedException e) {
logger.info("BasicLockClient interrupted");
this.stayinAlive = false;
}
} // sync
} // while
}
public void lock() throws Error {
doLock(LOCK);
}
public void unlock() throws Error {
doLock(UNLOCK);
}
public void suspend() throws Error {
doLock(SUSPEND);
}
public void resume() throws Error {
doLock(RESUME);
}
public void stop() throws Error {
doLock(STOP);
}
private void doLock(Integer lock) {
try {
synchronized (this.sync) {
this.latestRequest++;
Integer requestId = this.latestRequest;
this.operationsMap.put(requestId, lock);
this.requests.add(requestId);
this.sync.notify();
long maxWait = System.currentTimeMillis() + 2000;
while (!this.completedRequests.contains(requestId)) {
long waitMillis = maxWait - System.currentTimeMillis();
assertThat(waitMillis > 0).isTrue();
this.sync.wait(waitMillis);
}
Throwable t = (Throwable) this.throwables.get(requestId);
if (t != null) {
throw new Error(t);
}
}
} catch (Exception ex) {
throw new Error(ex);
}
}
}
}