| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.distributed; |
| |
| import static java.lang.Boolean.TRUE; |
| import static java.lang.Long.MAX_VALUE; |
| import static java.lang.System.out; |
| import static org.apache.geode.distributed.DistributedLockService.getServiceNamed; |
| import static org.apache.geode.test.awaitility.GeodeAwaitility.await; |
| import static org.apache.geode.test.dunit.ThreadUtils.dumpAllStacks; |
| import static org.apache.geode.test.dunit.VM.getVM; |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.junit.Assert.fail; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.stream.Collectors; |
| |
| import org.apache.logging.log4j.Logger; |
| import org.assertj.core.api.Assertions; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import org.apache.geode.SystemFailure; |
| import org.apache.geode.distributed.internal.ClusterDistributionManager; |
| import org.apache.geode.distributed.internal.DistributionMessage; |
| import org.apache.geode.distributed.internal.DistributionMessageObserver; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.distributed.internal.locks.DLockGrantor; |
| import org.apache.geode.distributed.internal.locks.DLockRemoteToken; |
| import org.apache.geode.distributed.internal.locks.DLockRequestProcessor; |
| import org.apache.geode.distributed.internal.locks.DLockRequestProcessor.DLockRequestMessage; |
| import org.apache.geode.distributed.internal.locks.DLockRequestProcessor.DLockResponseMessage; |
| import org.apache.geode.distributed.internal.locks.DLockService; |
| import org.apache.geode.distributed.internal.locks.DLockToken; |
| import org.apache.geode.distributed.internal.locks.RemoteThread; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.util.StopWatch; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| import org.apache.geode.test.awaitility.GeodeAwaitility; |
| import org.apache.geode.test.dunit.AsyncInvocation; |
| import org.apache.geode.test.dunit.Host; |
| import org.apache.geode.test.dunit.Invoke; |
| import org.apache.geode.test.dunit.SerializableCallable; |
| import org.apache.geode.test.dunit.SerializableRunnable; |
| import org.apache.geode.test.dunit.SerializableRunnableIF; |
| import org.apache.geode.test.dunit.ThreadUtils; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.Wait; |
| import org.apache.geode.test.dunit.WaitCriterion; |
| import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; |
| import org.apache.geode.test.junit.categories.DLockTest; |
| |
| /** |
| * This class tests distributed ownership via the DistributedLockService api. |
| */ |
| @Category({DLockTest.class}) |
| public final class DistributedLockServiceDUnitTest extends JUnit4DistributedTestCase { |
| private static Logger logger = LogService.getLogger(); |
| |
| private static DistributedSystem dlstSystem; |
| private static DistributedLockBlackboard blackboard; |
| private static Object monitor = new Object(); |
| |
| private int hits = 0; |
| private int completes = 0; |
| private boolean done; |
| private boolean got; |
| |
| /** |
| * Returns a previously created (or new, if this is the first time this method is called in this |
| * VM) distributed system which is somewhat configurable via hydra test parameters. |
| */ |
| @Override |
| public final void postSetUp() throws Exception { |
| |
| createBlackboard(); |
| Invoke.invokeInEveryVM(() -> createBlackboard()); |
| |
| // Create a DistributedSystem in every VM |
| connectDistributedSystem(); |
| |
| Invoke.invokeInEveryVM(() -> connectDistributedSystem()); |
| } |
| |
| private void createBlackboard() throws Exception { |
| if (blackboard == null) { |
| blackboard = DistributedLockBlackboardImpl.getInstance(); |
| } |
| } |
| |
| @Override |
| public final void preTearDown() { |
| Invoke.invokeInEveryVM(() -> destroyAllDLockServices()); |
| } |
| |
| @Override |
| public void postTearDown() { |
| disconnectAllFromDS(); |
| } |
| |
| public static void destroyAllDLockServices() { |
| DLockService.destroyAll(); |
| dlstSystem = null; |
| } |
| |
| /** |
| * Connects a DistributedSystem, saves it in static variable "system" |
| */ |
| protected static void connectDistributedSystem() { |
| dlstSystem = (new DistributedLockServiceDUnitTest()).getSystem(); |
| } |
| |
| private static volatile boolean stop_testFairness; |
| |
| @Test |
| public void testFairness() throws InterruptedException, ExecutionException { |
| final int[] vmThreads = new int[] {1, 4, 8, 16}; |
| final int numVms = vmThreads.length; |
| final int numThreads = Arrays.stream(vmThreads).sum(); |
| final List<VM> vms = new ArrayList<>(); |
| final List<Future> futures = new ArrayList<>(); |
| final String serviceName = "testFairness_" + getUniqueName(); |
| final Object lock = "lock"; |
| |
| // get the lock and hold it until all threads are ready to go |
| DistributedLockService service = DistributedLockService.create(serviceName, dlstSystem); |
| assertThat(service.lock(lock, -1, -1)).isTrue(); |
| |
| // create the lock service in all vms |
| vms.addAll(forNumVMsInvoke(numVms, () -> remoteCreateService(serviceName))); |
| Thread.sleep(100); |
| |
| Invoke.invokeInEveryVM(() -> { |
| stop_testFairness = false; |
| }); |
| |
| // line up threads for the fairness race... |
| for (int vm = 0; vm < numVms; vm++) { |
| logger.info("[testFairness] lining up " + vmThreads[vm] + " threads in vm " + vm); |
| |
| for (int j = 0; j < vmThreads[vm]; j++) { |
| SerializableCallable<Integer> fairnessRunnable = new SerializableCallable<Integer>() { |
| @Override |
| public Integer call() { |
| // lock, inc count, and unlock until stop_testFairness is set true |
| final AtomicInteger lockCount = new AtomicInteger(0); |
| try { |
| DistributedLockService service = |
| DistributedLockService.getServiceNamed(serviceName); |
| while (!stop_testFairness) { |
| assertThat(service.lock(lock, -1, -1)).isTrue(); |
| lockCount.incrementAndGet(); |
| service.unlock(lock); |
| } |
| } catch (VirtualMachineError e) { |
| SystemFailure.initiateFailure(e); |
| throw e; |
| } catch (Throwable t) { |
| logger.warn(t); |
| fail(t.getMessage()); |
| } finally { |
| return lockCount.get(); |
| } |
| } |
| }; |
| |
| futures.add(vms.get(vm).invokeAsync(() -> fairnessRunnable.call())); |
| } |
| } |
| |
| // wait for all threads to be ready to start the race |
| for (int i = 0; i < numVms; i++) { |
| final int vmId = i; |
| |
| vms.get(vmId).invoke(() -> { |
| DLockService localService = |
| (DLockService) DistributedLockService.getServiceNamed(serviceName); |
| |
| await() |
| .untilAsserted(() -> assertThat(localService.getStats().getLockWaitsInProgress()) |
| .isEqualTo(vmThreads[vmId])); |
| }); |
| } |
| |
| // start the race! |
| service.unlock(lock); |
| Thread.sleep(1000 * 5); // 5 seconds |
| |
| // stop the race... |
| assertThat(service.lock(lock, -1, -1)).isTrue(); |
| for (VM vm : vms) { |
| vm.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| stop_testFairness = true; |
| } |
| }); |
| } |
| |
| // release the lock and destroy the lock service |
| service.unlock(lock); |
| |
| // calc total locks granted... |
| Integer totalLocks = 0; |
| Integer maxLocks = 0; |
| Integer minLocks = Integer.MAX_VALUE; |
| |
| for (Future future : futures) { |
| Integer numLocks = (Integer) future.get(); |
| totalLocks += numLocks; |
| if (minLocks > numLocks) { |
| minLocks = numLocks; |
| } |
| if (maxLocks < numLocks) { |
| maxLocks = numLocks; |
| } |
| } |
| |
| vms.forEach(vm -> vm.invoke(() -> DistributedLockService.destroy(serviceName))); |
| |
| logger.info("[testFairness] totalLocks=" + totalLocks + " minLocks=" |
| + minLocks + " maxLocks=" + maxLocks); |
| |
| int expectedLocks = (totalLocks / numThreads) + 1; |
| |
| int deviation = (int) (expectedLocks * 0.3); |
| int lowThreshold = expectedLocks - deviation; |
| int highThreshold = expectedLocks + deviation; |
| |
| logger.info("[testFairness] deviation=" + deviation + " expectedLocks=" |
| + expectedLocks + " lowThreshold=" + lowThreshold + " highThreshold=" + highThreshold); |
| |
| assertThat(minLocks >= lowThreshold).withFailMessage("minLocks is less than lowThreshold") |
| .isTrue(); |
| assertThat(maxLocks <= highThreshold).withFailMessage("maxLocks is greater than highThreshold") |
| .isTrue(); |
| } |
| |
| @Test |
| public void testOneGetsAndOthersTimeOut() throws Exception { |
| doOneGetsAndOthersTimeOut(1, 1); |
| doOneGetsAndOthersTimeOut(4, 3); |
| } |
| |
| private static InternalDistributedMember getLockGrantor(String serviceName) { |
| DLockService service = (DLockService) DistributedLockService.getServiceNamed(serviceName); |
| assertThat(service).isNotNull(); |
| InternalDistributedMember grantor = service.getLockGrantorId().getLockGrantorMember(); |
| assertThat(grantor).isNotNull(); |
| System.out.println("In identifyLockGrantor - grantor is " + grantor); |
| return grantor; |
| } |
| |
| private static Boolean isLockGrantor(String serviceName) { |
| DLockService service = (DLockService) DistributedLockService.getServiceNamed(serviceName); |
| assertThat(service).isNotNull(); |
| Boolean result = service.isLockGrantor(); |
| System.out.println("In isLockGrantor: " + result); |
| return result; |
| } |
| |
| protected static void becomeLockGrantor(String serviceName) { |
| DLockService service = (DLockService) DistributedLockService.getServiceNamed(serviceName); |
| assertThat(service).isNotNull(); |
| System.out.println("About to call becomeLockGrantor..."); |
| service.becomeLockGrantor(); |
| } |
| |
| @Test |
| public void testGrantorSelection() { |
| // TODO change distributedCreateService usage to be concurrent threads |
| |
| final String serviceName = "testGrantorSelection_" + getUniqueName(); |
| |
| final List<VM> vmList = distributedCreateService(4, serviceName, true); |
| |
| |
| getLockGrantor(serviceName); |
| |
| assertGrantorIsConsistent(serviceName, vmList); |
| } |
| |
| @Test |
| public void testBasicGrantorRecovery() { |
| int numVMs = 4; |
| final String serviceName = "testBasicGrantorRecovery_" + getUniqueName(); |
| final List<VM> vmList = distributedCreateService(numVMs, serviceName, false); |
| |
| // Apparently it's necessary to query the lock grantor to have a server be nominated grantor. |
| vmList.get(0).invoke(() -> getLockGrantor(serviceName)); |
| |
| VM originalGrantorVM = identifyLockGrantorWithSanityCheck(serviceName, vmList); |
| VM lockHolder = null; |
| for (VM vm : vmList) { |
| if (vm != originalGrantorVM) { |
| lockHolder = vm; |
| vm.invoke(() -> DLockService.getServiceNamed(serviceName).lock("foo", 1200, -1)); |
| break; |
| } |
| } |
| |
| final InternalDistributedMember originalGrantorMember = |
| originalGrantorVM.invoke(() -> getLockGrantor(serviceName)); |
| |
| System.out.println("originalGrantorMember = " + originalGrantorMember); |
| final InternalDistributedMember originalGrantorSelfReportedMember = originalGrantorVM |
| .invoke(() -> InternalDistributedSystem.getAnyInstance().getDistributedMember()); |
| assertThat(originalGrantorMember).isEqualTo(originalGrantorSelfReportedMember); |
| |
| originalGrantorVM.invoke(() -> disconnectFromDS()); |
| |
| vmList.remove(originalGrantorVM); |
| |
| await("vm0 leave has been processed") |
| .until(() -> vmList.parallelStream() |
| .noneMatch(vm -> vm.invoke(() -> InternalDistributedSystem |
| .getAnyInstance() |
| .getAllOtherMembers() |
| .contains(originalGrantorMember)))); |
| |
| // It seems that we need to actually use the lock service to have a new grantor be picked. |
| lockHolder.invoke(() -> DLockService.getServiceNamed(serviceName).unlock("foo")); |
| |
| final VM newGrantorMember = identifyLockGrantorWithSanityCheck(serviceName, vmList); |
| assertThat(newGrantorMember).isNotEqualTo(originalGrantorMember); |
| } |
| |
| /** |
| * This is just a better-sounding wrapper for when we want to do only consistency checks. |
| */ |
| private static void assertGrantorIsConsistent(String serviceName, Collection<VM> vmList) { |
| identifyLockGrantorWithSanityCheck(serviceName, vmList); |
| } |
| |
| private static VM identifyLockGrantorWithSanityCheck(String serviceName, Collection<VM> vmList) { |
| VM grantorVM = null; |
| |
| for (VM vm : vmList) { |
| if (vm.invoke(() -> isLockGrantor(serviceName))) { |
| grantorVM = vm; |
| } |
| } |
| |
| assertThat(grantorVM).withFailMessage("Some VM should think it's the lock grantor").isNotNull(); |
| |
| final DistributedMember grantorID = |
| grantorVM.invoke(() -> InternalDistributedSystem.getAnyInstance().getDistributedMember()); |
| |
| for (VM vm : vmList) { |
| if (vm != grantorVM) { |
| assertThat(vm.invoke(() -> isLockGrantor(serviceName))).isFalse(); |
| } |
| assertThat(vm.invoke(() -> getLockGrantor(serviceName))).isEqualTo(grantorID); |
| } |
| |
| return grantorVM; |
| } |
| |
| @Test |
| public void testLockFailover() { |
| final int originalGrantorVM = 0; |
| final int oneVM = 1; |
| final int twoVM = 2; |
| final String serviceName = "testLockFailover-" + getUniqueName(); |
| |
| // create lock services... |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testLockFailover] create services"); |
| } |
| |
| VM.getVM(originalGrantorVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.remoteCreateService(serviceName)); |
| |
| VM.getVM(oneVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.remoteCreateService(serviceName)); |
| |
| VM.getVM(twoVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.remoteCreateService(serviceName)); |
| |
| VM.getVM(originalGrantorVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.getLockGrantor(serviceName)); |
| |
| Boolean isGrantor = VM.getVM(originalGrantorVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.isLockGrantor(serviceName)); |
| assertThat(isGrantor) |
| .withFailMessage("First member calling getLockGrantor failed to become grantor") |
| .isEqualTo(Boolean.TRUE); |
| |
| // get locks... |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testLockFailover] get lock"); |
| } |
| |
| Boolean locked = VM.getVM(originalGrantorVM).invoke( |
| () -> DistributedLockServiceDUnitTest.lock(serviceName, "KEY-" + originalGrantorVM)); |
| assertThat(locked).withFailMessage("Failed to get lock in testLockFailover") |
| .isEqualTo(Boolean.TRUE); |
| |
| locked = VM.getVM(twoVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.lock(serviceName, "KEY-" + twoVM)); |
| assertThat(locked).withFailMessage("Failed to get lock in testLockFailover") |
| .isEqualTo(Boolean.TRUE); |
| |
| locked = VM.getVM(oneVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.lock(serviceName, "KEY-" + oneVM)); |
| assertThat(locked).withFailMessage("Failed to get lock in testLockFailover") |
| .isEqualTo(Boolean.TRUE); |
| |
| // disconnect originalGrantorVM... |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testLockFailover] disconnect originalGrantorVM"); |
| } |
| |
| VM.getVM(originalGrantorVM).invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| disconnectFromDS(); |
| } |
| }); |
| |
| try { |
| Thread.sleep(100); |
| } catch (InterruptedException ignore) { |
| fail("interrupted"); |
| } |
| |
| // verify locks by unlocking... |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testLockFailover] release locks"); |
| } |
| |
| Boolean unlocked = VM.getVM(twoVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.unlock(serviceName, "KEY-" + twoVM)); |
| assertThat(unlocked).withFailMessage("Failed to release lock in testLockFailover") |
| .isEqualTo(Boolean.TRUE); |
| |
| unlocked = VM.getVM(oneVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.unlock(serviceName, "KEY-" + oneVM)); |
| assertThat(unlocked).withFailMessage("Failed to release lock in testLockFailover") |
| .isEqualTo(Boolean.TRUE); |
| |
| // switch locks... |
| locked = VM.getVM(oneVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.lock(serviceName, "KEY-" + twoVM)); |
| assertThat(unlocked).withFailMessage("Failed to release lock in testLockFailover") |
| .isEqualTo(Boolean.TRUE); |
| |
| locked = VM.getVM(twoVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.lock(serviceName, "KEY-" + oneVM)); |
| assertThat(unlocked).withFailMessage("Failed to release lock in testLockFailover") |
| .isEqualTo(Boolean.TRUE); |
| |
| unlocked = VM.getVM(oneVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.unlock(serviceName, "KEY-" + twoVM)); |
| assertThat(unlocked).withFailMessage("Failed to release lock in testLockFailover") |
| .isEqualTo(Boolean.TRUE); |
| |
| unlocked = VM.getVM(twoVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.unlock(serviceName, "KEY-" + oneVM)); |
| assertThat(unlocked).withFailMessage("Failed to release lock in testLockFailover") |
| .isEqualTo(Boolean.TRUE); |
| |
| // verify grantor is unique... |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testLockFailover] verify grantor identity"); |
| } |
| |
| InternalDistributedMember oneID = VM.getVM(oneVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.getLockGrantor(serviceName)); |
| InternalDistributedMember twoID = VM.getVM(twoVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.getLockGrantor(serviceName)); |
| assertThat(oneID != null && twoID != null) |
| .withFailMessage("Failed to identifyLockGrantor in testLockFailover").isTrue(); |
| assertThat(twoID).withFailMessage("Failed grantor uniqueness in testLockFailover") |
| .isEqualTo(oneID); |
| } |
| |
| @Test |
| public void testLockThenBecomeLockGrantor() { |
| final int originalGrantorVM = 0; |
| final int becomeGrantorVM = 1; |
| final int thirdPartyVM = 2; |
| final String serviceName = "testLockThenBecomeLockGrantor-" + getUniqueName(); |
| |
| // create lock services... |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testLockThenBecomeLockGrantor] create services"); |
| } |
| |
| VM.getVM(originalGrantorVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.remoteCreateService(serviceName)); |
| |
| try { |
| Thread.sleep(20); |
| } catch (InterruptedException ignore) { |
| fail("interrupted"); |
| } |
| |
| VM.getVM(becomeGrantorVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.remoteCreateService(serviceName)); |
| |
| VM.getVM(thirdPartyVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.remoteCreateService(serviceName)); |
| |
| VM.getVM(originalGrantorVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.getLockGrantor(serviceName)); |
| |
| Boolean isGrantor = VM.getVM(originalGrantorVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.isLockGrantor(serviceName)); |
| assertThat(isGrantor).isEqualTo(Boolean.TRUE) |
| .withFailMessage("First member calling getLockGrantor failed to become grantor"); |
| |
| // control... |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testLockThenBecomeLockGrantor] check control"); |
| } |
| Boolean check = VM.getVM(becomeGrantorVM).invoke( |
| () -> DistributedLockServiceDUnitTest.unlock(serviceName, "KEY-" + becomeGrantorVM)); |
| assertThat(check).isEqualTo(Boolean.FALSE) |
| .withFailMessage("Check of control failed... unlock succeeded but nothing locked"); |
| |
| // get locks... |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testLockThenBecomeLockGrantor] get lock"); |
| } |
| |
| Boolean locked = VM.getVM(originalGrantorVM).invoke( |
| () -> DistributedLockServiceDUnitTest.lock(serviceName, "KEY-" + originalGrantorVM)); |
| assertThat(locked).isEqualTo(Boolean.TRUE) |
| .withFailMessage("Failed to get lock in testLockThenBecomeLockGrantor"); |
| |
| locked = VM.getVM(thirdPartyVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.lock(serviceName, "KEY-" + thirdPartyVM)); |
| assertThat(locked).isEqualTo(Boolean.TRUE) |
| .withFailMessage("Failed to get lock in testLockThenBecomeLockGrantor"); |
| |
| locked = VM.getVM(becomeGrantorVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.lock(serviceName, "KEY-" + becomeGrantorVM)); |
| assertThat(locked).isEqualTo(Boolean.TRUE) |
| .withFailMessage("Failed to get lock in testLockThenBecomeLockGrantor"); |
| |
| // become lock grantor... |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testLockThenBecomeLockGrantor] become lock grantor"); |
| } |
| |
| VM.getVM(becomeGrantorVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.becomeLockGrantor(serviceName)); |
| |
| try { |
| Thread.sleep(20); |
| } catch (InterruptedException ignore) { |
| fail("interrupted"); |
| } |
| |
| isGrantor = VM.getVM(becomeGrantorVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.isLockGrantor(serviceName)); |
| assertThat(isGrantor).isEqualTo(Boolean.TRUE).withFailMessage("Failed to become lock grantor"); |
| |
| // verify locks by unlocking... |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testLockThenBecomeLockGrantor] release locks"); |
| } |
| |
| Boolean unlocked = VM.getVM(originalGrantorVM).invoke( |
| () -> DistributedLockServiceDUnitTest.unlock(serviceName, "KEY-" + originalGrantorVM)); |
| assertThat(unlocked).isEqualTo(Boolean.TRUE) |
| .withFailMessage("Failed to release lock in testLockThenBecomeLockGrantor"); |
| |
| unlocked = VM.getVM(thirdPartyVM) |
| .invoke(() -> DistributedLockServiceDUnitTest.unlock(serviceName, "KEY-" + thirdPartyVM)); |
| assertThat(unlocked).isEqualTo(Boolean.TRUE) |
| .withFailMessage("Failed to release lock in testLockThenBecomeLockGrantor"); |
| |
| unlocked = VM.getVM(becomeGrantorVM).invoke( |
| () -> DistributedLockServiceDUnitTest.unlock(serviceName, "KEY-" + becomeGrantorVM)); |
| assertThat(unlocked).isEqualTo(Boolean.TRUE) |
| .withFailMessage("Failed to release lock in testLockThenBecomeLockGrantor"); |
| |
| // test for bug in which transferred token gets re-entered causing lock recursion |
| unlocked = VM.getVM(becomeGrantorVM).invoke( |
| () -> DistributedLockServiceDUnitTest.unlock(serviceName, "KEY-" + becomeGrantorVM)); |
| assertThat(unlocked).isEqualTo(Boolean.FALSE) |
| .withFailMessage("Transfer of tokens caused lock recursion in held lock"); |
| } |
| |
| @Test |
| public void testBecomeLockGrantor() { |
| // create lock services... |
| int numVMs = 4; |
| final String serviceName = "testBecomeLockGrantor-" + getUniqueName(); |
| distributedCreateService(numVMs, serviceName, true); |
| |
| // each one gets a lock... |
| for (int vm = 0; vm < numVMs; vm++) { |
| final int finalvm = vm; |
| Boolean locked = VM.getVM(finalvm) |
| .invoke(() -> DistributedLockServiceDUnitTest.lock(serviceName, "obj-" + finalvm)); |
| assertThat(locked).isEqualTo(Boolean.TRUE) |
| .withFailMessage("Failed to get lock in testBecomeLockGrantor"); |
| } |
| |
| // find the grantor... |
| int originalVM = -1; |
| InternalDistributedMember oldGrantor = null; |
| for (int vm = 0; vm < numVMs; vm++) { |
| Boolean isGrantor = VM.getVM(vm) |
| .invoke(() -> DistributedLockServiceDUnitTest.isLockGrantor(serviceName)); |
| if (isGrantor) { |
| originalVM = vm; |
| oldGrantor = VM.getVM(vm) |
| .invoke(() -> DistributedLockServiceDUnitTest.getLockGrantor( |
| serviceName)); |
| break; |
| } |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testBecomeLockGrantor] original grantor is " + oldGrantor); |
| } |
| |
| // have one call becomeLockGrantor |
| for (int vm = 0; vm < numVMs; vm++) { |
| if (vm != originalVM) { |
| VM.getVM(vm) |
| .invoke(() -> DistributedLockServiceDUnitTest.becomeLockGrantor(serviceName)); |
| Boolean isGrantor = VM.getVM(vm) |
| .invoke(() -> DistributedLockServiceDUnitTest.isLockGrantor(serviceName)); |
| assertThat(isGrantor).isEqualTo(Boolean.TRUE) |
| .withFailMessage("isLockGrantor is false after calling becomeLockGrantor"); |
| break; |
| } |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testBecomeLockGrantor] one vm has called becomeLockGrantor..."); |
| } |
| |
| InternalDistributedMember newGrantor = null; |
| for (int vm = 0; vm < numVMs; vm++) { |
| Boolean isGrantor = VM.getVM(vm) |
| .invoke(() -> DistributedLockServiceDUnitTest.isLockGrantor(serviceName)); |
| if (isGrantor) { |
| newGrantor = VM.getVM(vm) |
| .invoke(() -> DistributedLockServiceDUnitTest.getLockGrantor( |
| serviceName)); |
| break; |
| } |
| } |
| assertThat(newGrantor).isNotEqualTo(oldGrantor); |
| // verify locks still held by unlocking |
| // each one unlocks... |
| for (int vm = 0; vm < numVMs; vm++) { |
| final int finalvm = vm; |
| Boolean unlocked = VM.getVM(finalvm) |
| .invoke(() -> DistributedLockServiceDUnitTest.unlock(serviceName, "obj-" + finalvm)); |
| assertThat(unlocked).isEqualTo(Boolean.TRUE) |
| .withFailMessage("Failed to unlock in testBecomeLockGrantor"); |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testBecomeLockGrantor] finished"); |
| } |
| |
| // verify that pending requests are granted by unlocking them also |
| } |
| |
| @Test |
| public void testOnlyOneVmAcquiresWithTryLock() throws Exception { |
| final Long waitMillis = 100L; |
| |
| // create lock services... |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testTryLock] create lock services"); |
| } |
| final String serviceName = "testTryLock-" + getUniqueName(); |
| final List<VM> vms = distributedCreateService(4, serviceName, false); |
| |
| final List<AsyncInvocation<Boolean>> invocations = vms.stream() |
| .map(vm -> vm.invokeAsync( |
| () -> DistributedLockServiceDUnitTest.tryLock(serviceName, "KEY", waitMillis))) |
| .collect(Collectors |
| .toList()); |
| |
| int lockCount = 0; |
| for (AsyncInvocation<Boolean> invocation : invocations) { |
| if (invocation.get()) { |
| lockCount++; |
| } |
| } |
| |
| assertThat(lockCount).isEqualTo(1) |
| .withFailMessage("More than one vm acquired the tryLock"); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testTryLock] unlock tryLock"); |
| } |
| int unlockCount = 0; |
| for (VM vm : vms) { |
| Boolean unlocked = |
| vm.invoke(() -> DistributedLockServiceDUnitTest.unlock(serviceName, "KEY")); |
| if (unlocked) { |
| unlockCount++; |
| } |
| } |
| |
| assertThat(unlockCount).withFailMessage("More than one vm unlocked the tryLock").isEqualTo(1); |
| } |
| |
| @Test |
| public void testOneGetsThenOtherGets() throws Exception { // (numVMs, numThreadsPerVM) |
| doOneGetsThenOtherGets(1, 1); |
| // doOneGetsThenOtherGets(2, 2); |
| // doOneGetsThenOtherGets(3, 3); |
| doOneGetsThenOtherGets(4, 3); |
| } |
| |
| @Test |
| public void testLockDifferentNames() { |
| String serviceName = getUniqueName(); |
| |
| // Same VM |
| remoteCreateService(serviceName); |
| DistributedLockService service = DistributedLockService.getServiceNamed(serviceName); |
| assertThat(service.lock("obj1", -1, -1)).isTrue(); |
| assertThat(service.lock("obj2", -1, -1)).isTrue(); |
| service.unlock("obj1"); |
| service.unlock("obj2"); |
| |
| // Different VMs |
| VM vm = VM.getVM(0); |
| vm.invoke(() -> remoteCreateService(serviceName)); |
| assertThat(service.lock("masterVMobj", -1, -1)).isTrue(); |
| |
| assertThat(vm.invoke(() -> getLockAndIncrement(serviceName, "otherVMobj", -1, 0))).isTrue(); |
| |
| service.unlock("masterVMobj"); |
| } |
| |
| @Test |
| public void testLocalGetLockAndIncrement() throws Exception { |
| String serviceName = getUniqueName(); |
| remoteCreateService(serviceName); |
| DistributedLockService.getServiceNamed(serviceName); |
| assertThat(getLockAndIncrement(serviceName, "localVMobj", -1, 0)).isTrue(); |
| } |
| |
| @Test |
| public void testRemoteGetLockAndIncrement() { |
| String serviceName = getUniqueName(); |
| VM vm = VM.getVM(0); |
| vm.invoke(() -> remoteCreateService(serviceName)); |
| assertThat(vm.invoke(() -> getLockAndIncrement(serviceName, "remoteVMobj", |
| -1, 0))).isTrue(); |
| } |
| |
| @Test |
| public void testLockSameNameDifferentService() { |
| String serviceName1 = getUniqueName() + "_1"; |
| String serviceName2 = getUniqueName() + "_2"; |
| String objName = "obj"; |
| |
| // Same VM |
| remoteCreateService(serviceName1); |
| remoteCreateService(serviceName2); |
| DistributedLockService service1 = DistributedLockService.getServiceNamed(serviceName1); |
| DistributedLockService service2 = DistributedLockService.getServiceNamed(serviceName2); |
| assertThat(service1.lock(objName, -1, -1)).isTrue(); |
| assertThat(service2.lock(objName, -1, -1)).isTrue(); |
| service1.unlock(objName); |
| service2.unlock(objName); |
| |
| // Different VMs |
| VM vm = VM.getVM(0); |
| vm.invoke(() -> remoteCreateService(serviceName1)); |
| vm.invoke(() -> remoteCreateService(serviceName2)); |
| assertThat(service1.lock(objName, -1, -1)).isTrue(); |
| assertThat(vm.invoke(() -> getLockAndIncrement(serviceName2, objName, -1, 0))).isTrue(); |
| service1.unlock(objName); |
| } |
| |
| @Test |
| public void testLeaseDoesntExpire() { |
| String serviceName = getUniqueName(); |
| final Object objName = 3; |
| |
| // Same VM |
| remoteCreateService(serviceName); |
| final DistributedLockService service = DistributedLockService.getServiceNamed(serviceName); |
| // lock objName with a sufficiently long lease |
| assertThat(service.lock(objName, -1, 60000)).isTrue(); |
| // try to lock in another thread, with a timeout shorter than above lease |
| final boolean[] resultHolder = new boolean[] {false}; |
| Thread thread = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| resultHolder[0] = !service.lock(objName, 1000, -1); |
| } |
| }); |
| thread.start(); |
| ThreadUtils.join(thread, 30 * 1000); |
| assertThat(resultHolder[0]).isTrue(); |
| // the unlock should succeed without throwing LeaseExpiredException |
| service.unlock(objName); |
| |
| // Different VM |
| VM vm = VM.getVM(0); |
| vm.invoke(() -> remoteCreateService(serviceName)); |
| // lock objName in this VM with a sufficiently long lease |
| assertThat(service.lock(objName, -1, 60000)).isTrue(); |
| // try to lock in another VM, with a timeout shorter than above lease |
| assertThat(vm.invoke(() -> getLockAndIncrement(serviceName, objName, 1000L, 0L))).isFalse(); |
| // the unlock should succeed without throwing LeaseExpiredException |
| service.unlock(objName); |
| } |
| |
| @Test |
| public void testLockUnlock() { |
| String serviceName = getUniqueName(); |
| Object objName = 42; |
| |
| remoteCreateService(serviceName); |
| DistributedLockService service = DistributedLockService.getServiceNamed(serviceName); |
| |
| assertThat(!service.isHeldByCurrentThread(objName)).isTrue(); |
| |
| service.lock(objName, -1, -1); |
| assertThat(service.isHeldByCurrentThread(objName)).isTrue(); |
| |
| service.unlock(objName); |
| assertThat(!service.isHeldByCurrentThread(objName)).isTrue(); |
| } |
| |
| @Test |
| public void testLockExpireUnlock() throws Exception { |
| long leaseMs = 200; |
| long waitBeforeLockingMs = 210; |
| |
| String serviceName = getUniqueName(); |
| Object objName = 42; |
| |
| remoteCreateService(serviceName); |
| DistributedLockService service = DistributedLockService.getServiceNamed(serviceName); |
| |
| assertThat(!service.isHeldByCurrentThread(objName)).isTrue(); |
| |
| assertThat(service.lock(objName, -1, leaseMs)).isTrue(); |
| assertThat(service.isHeldByCurrentThread(objName)).isTrue(); |
| |
| Thread.sleep(waitBeforeLockingMs); // should expire... |
| assertThat(!service.isHeldByCurrentThread(objName)).isTrue(); |
| |
| Assertions.assertThatThrownBy(() -> service.unlock(objName)) |
| .isInstanceOf(LeaseExpiredException.class); |
| } |
| |
| @Test |
| public void testLockRecursion() { |
| String serviceName = getUniqueName(); |
| Object objName = 42; |
| |
| remoteCreateService(serviceName); |
| DistributedLockService service = DistributedLockService.getServiceNamed(serviceName); |
| |
| assertThat(!service.isHeldByCurrentThread(objName)).isTrue(); |
| |
| // initial lock... |
| assertThat(service.lock(objName, -1, -1)).isTrue(); |
| assertThat(service.isHeldByCurrentThread(objName)).isTrue(); |
| |
| // recursion +1... |
| assertThat(service.lock(objName, -1, -1)).isTrue(); |
| |
| // recursion -1... |
| service.unlock(objName); |
| assertThat(service.isHeldByCurrentThread(objName)).isTrue(); |
| |
| // and unlock... |
| service.unlock(objName); |
| assertThat(!service.isHeldByCurrentThread(objName)).isTrue(); |
| } |
| |
| @Test |
| public void testLockRecursionWithExpiration() throws Exception { |
| long leaseMs = 500; |
| long waitBeforeLockingMs = 750; |
| |
| String serviceName = getUniqueName(); |
| Object objName = 42; |
| |
| remoteCreateService(serviceName); |
| DistributedLockService service = DistributedLockService.getServiceNamed(serviceName); |
| |
| assertThat(!service.isHeldByCurrentThread(objName)).isTrue(); |
| |
| // initial lock... |
| assertThat(service.lock(objName, -1, leaseMs)).isTrue(); |
| assertThat(service.isHeldByCurrentThread(objName)).isTrue(); |
| |
| // recursion +1... |
| assertThat(service.lock(objName, -1, leaseMs)).isTrue(); |
| assertThat(service.isHeldByCurrentThread(objName)).isTrue(); |
| |
| // expire... |
| Thread.sleep(waitBeforeLockingMs); |
| assertThat(!service.isHeldByCurrentThread(objName)).isTrue(); |
| |
| // should fail... |
| try { |
| service.unlock(objName); |
| fail("unlock should have thrown LeaseExpiredException"); |
| } catch (LeaseExpiredException ex) { |
| } |
| |
| // relock it... |
| assertThat(service.lock(objName, -1, leaseMs)).isTrue(); |
| assertThat(service.isHeldByCurrentThread(objName)).isTrue(); |
| |
| // and unlock to verify no recursion... |
| service.unlock(objName); |
| assertThat(!service.isHeldByCurrentThread(objName)).isTrue(); // throws failure!! |
| |
| // go thru again in different order... |
| assertThat(!service.isHeldByCurrentThread(objName)).isTrue(); |
| |
| // initial lock... |
| assertThat(service.lock(objName, -1, leaseMs)).isTrue(); |
| assertThat(service.isHeldByCurrentThread(objName)).isTrue(); |
| |
| // expire... |
| Thread.sleep(waitBeforeLockingMs); |
| assertThat(!service.isHeldByCurrentThread(objName)).isTrue(); |
| |
| // relock it... |
| assertThat(service.lock(objName, -1, leaseMs)).isTrue(); |
| assertThat(service.isHeldByCurrentThread(objName)).isTrue(); |
| |
| // and unlock to verify no recursion... |
| service.unlock(objName); |
| assertThat(!service.isHeldByCurrentThread(objName)).isTrue(); |
| } |
| |
| @Test |
| public void testLeaseExpiresBeforeOtherLocks() throws InterruptedException { |
| leaseExpiresTest(false); |
| } |
| |
| @Test |
| public void testLeaseExpiresWhileOtherLocks() throws InterruptedException { |
| leaseExpiresTest(true); |
| } |
| |
| private void leaseExpiresTest(boolean tryToLockBeforeExpiration) throws InterruptedException { |
| long leaseMs = 100; |
| long waitBeforeLockingMs = tryToLockBeforeExpiration ? 50 : 110; |
| |
| final String serviceName = getUniqueName(); |
| final Object objName = 3; |
| |
| // Same VM |
| remoteCreateService(serviceName); |
| final DistributedLockService service = DistributedLockService.getServiceNamed(serviceName); |
| |
| // lock objName with a short lease |
| assertThat(service.lock(objName, -1, leaseMs)).isTrue(); |
| Thread.sleep(waitBeforeLockingMs); |
| |
| if (waitBeforeLockingMs > leaseMs) { |
| assertThat(!service.isHeldByCurrentThread(objName)).isTrue(); |
| } |
| |
| // try to lock in another thread - lease should have expired |
| final boolean[] resultHolder = new boolean[] {false}; |
| Thread thread = new Thread(() -> { |
| resultHolder[0] = service.lock(objName, -1, -1); |
| service.unlock(objName); |
| assertThat(!service.isHeldByCurrentThread(objName)).isTrue(); |
| }); |
| thread.start(); |
| ThreadUtils.join(thread, 30 * 1000); |
| assertThat(resultHolder[0]).isTrue(); |
| |
| // this thread's unlock should throw LeaseExpiredException |
| Assertions.assertThatThrownBy(() -> { |
| service.unlock(objName); |
| }).isInstanceOf(LeaseExpiredException.class); |
| |
| VM vm = VM.getVM(0); |
| vm.invoke(() -> remoteCreateService(serviceName)); |
| |
| // lock objName in this VM with a short lease |
| assertThat(service.lock(objName, -1, leaseMs)).isTrue(); |
| Thread.sleep(waitBeforeLockingMs); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testLeaseExpires] succeed lock in other vm"); |
| } |
| // try to lock in another VM - should succeed |
| assertThat(vm.invoke(() -> getLockAndIncrement(serviceName, objName, (long) -1, 0L))) |
| .isEqualTo(TRUE); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testLeaseExpires] unlock should throw LeaseExpiredException again"); |
| } |
| // this VMs unlock should throw LeaseExpiredException |
| Assertions.assertThatThrownBy(() -> { |
| service.unlock(objName); |
| }).isInstanceOf(LeaseExpiredException.class); |
| } |
| |
| @Test |
| public void testSuspendLockingAfterExpiration() throws Exception { |
| final long leaseMillis = 100; |
| final long suspendWaitMillis = 10000; |
| |
| final String serviceName = getUniqueName(); |
| final Object key = 3; |
| |
| // controller locks key and then expires - controller is grantor |
| |
| DistributedLockService dls = DistributedLockService.create(serviceName, getSystem()); |
| |
| assertThat(dls.lock(key, -1, leaseMillis)).isTrue(); |
| |
| // wait for expiration |
| Thread.sleep(leaseMillis * 2); |
| |
| Assertions.assertThatThrownBy(() -> dls.unlock(key)).isInstanceOf(LeaseExpiredException.class); |
| |
| // other vm calls suspend |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("[leaseExpiresThenSuspendTest] call to suspend locking"); |
| } |
| VM.getVM(0).invoke(() -> { |
| final DistributedLockService dlock = |
| DistributedLockService.create(serviceName, getSystem()); |
| dlock.suspendLocking(suspendWaitMillis); |
| dlock.resumeLocking(); |
| assertThat(dlock.lock(key, -1, leaseMillis)).isTrue(); |
| dlock.unlock(key); |
| }); |
| } |
| |
| private volatile boolean started = false; |
| private volatile boolean gotLock = false; |
| volatile Throwable exception = null; |
| private volatile Throwable throwable = null; |
| |
| @Test |
| public void testLockInterruptiblyIsInterruptible() { |
| started = false; |
| gotLock = false; |
| exception = null; |
| throwable = null; |
| |
| // Get a lock in first thread |
| final String serviceName = getUniqueName(); |
| final DistributedLockService service = DistributedLockService.create(serviceName, dlstSystem); |
| service.becomeLockGrantor(); |
| assertThat(service.lock("obj", 1000, -1)).isTrue(); |
| |
| // Start second thread that tries to lock in second thread |
| Thread thread2 = new Thread(() -> { |
| try { |
| started = true; |
| gotLock = service.lockInterruptibly("obj", -1, -1); |
| } catch (InterruptedException ex) { |
| exception = ex; |
| } catch (VirtualMachineError e) { |
| SystemFailure.initiateFailure(e); |
| throw e; |
| } catch (Throwable t) { |
| throwable = t; |
| } |
| }); |
| thread2.start(); |
| |
| // Interrupt second thread |
| while (!started) |
| Thread.yield(); |
| thread2.interrupt(); |
| ThreadUtils.join(thread2, 20 * 1000); |
| |
| // Expect it got InterruptedException and didn't lock the service |
| assertThat(gotLock).isFalse(); |
| if (throwable != null) { |
| logger.warn("testLockInterruptiblyIsInterruptible threw unexpected Throwable", throwable); |
| } |
| assertThat(exception).isNotNull(); |
| |
| // Unlock "obj" in first thread |
| service.unlock("obj"); |
| |
| // Make sure it didn't get locked by second thread |
| logger.info( |
| "[testLockInterruptiblyIsInterruptible] try to get lock with timeout should not fail"); |
| assertThat(service.lock("obj", 5000, -1)).isTrue(); |
| DistributedLockService.destroy(serviceName); |
| } |
| |
| private volatile boolean wasFlagSet = false; |
| |
| @Test |
| public void testLockIsNotInterruptible() throws Exception { |
| // Lock entire service in first thread |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testLockIsNotInterruptible] lock in first thread"); |
| } |
| started = false; |
| gotLock = false; |
| exception = null; |
| wasFlagSet = false; |
| |
| final String serviceName = getUniqueName(); |
| final DistributedLockService service = DistributedLockService.create(serviceName, dlstSystem); |
| assertThat(service.lock("obj", 1000, -1)).isTrue(); |
| |
| // Start second thread that tries to lock in second thread |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testLockIsNotInterruptible] attempt lock in second thread"); |
| } |
| Thread thread2 = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| started = true; |
| gotLock = service.lock("obj", -1, -1); |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testLockIsNotInterruptible] thread2 finished lock() - got " + gotLock); |
| } |
| } catch (VirtualMachineError e) { |
| SystemFailure.initiateFailure(e); |
| throw e; |
| } catch (Throwable ex) { |
| logger.warn("[testLockIsNotInterruptible] Caught...", ex); |
| exception = ex; |
| } |
| wasFlagSet = Thread.currentThread().isInterrupted(); |
| } |
| }); |
| thread2.start(); |
| |
| // Interrupt second thread |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testLockIsNotInterruptible] interrupt second thread"); |
| } |
| while (!started) |
| Thread.yield(); |
| Thread.sleep(500); |
| thread2.interrupt(); |
| // Expect it didn't get an exception and didn't lock the service |
| Thread.sleep(500); |
| assertThat(gotLock).isFalse(); |
| assertThat(exception).isNull(); |
| |
| // Unlock entire service in first thread |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testLockIsNotInterruptible] unlock in first thread"); |
| } |
| service.unlock("obj"); |
| Thread.sleep(500); |
| |
| // Expect that thread2 should now complete execution. |
| ThreadUtils.join(thread2, 20 * 1000); |
| |
| // Now thread2 should have gotten the lock, not the exception, but the |
| // thread's flag should be set |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testLockIsNotInterruptible] verify second thread got lock"); |
| } |
| assertThat(exception).isNull(); |
| assertThat(gotLock).isTrue(); |
| assertThat(wasFlagSet).isTrue(); |
| } |
| |
| /** |
| * Test DistributedLockService.acquireExclusiveLocking(), releaseExclusiveLocking() |
| */ |
| @Test |
| public void testSuspendLockingBasic() { |
| final DistributedLockService service = |
| DistributedLockService.create(getUniqueName(), dlstSystem); |
| |
| try { |
| service.resumeLocking(); |
| fail("Didn't throw LockNotHeldException"); |
| } catch (LockNotHeldException ex) { |
| // expected |
| } |
| |
| assertThat(service.suspendLocking(-1)).isTrue(); |
| service.resumeLocking(); |
| |
| // It's not reentrant |
| assertThat(service.suspendLocking(1000)).isTrue(); |
| try { |
| service.suspendLocking(1); |
| fail("didn't get IllegalStateException"); |
| } catch (IllegalStateException ex) { |
| // expected |
| } |
| service.resumeLocking(); |
| |
| // Get "false" if another thread is holding it |
| Thread thread = new Thread(() -> { |
| System.out.println("new thread about to suspendLocking()"); |
| assertThat(service.suspendLocking(1000)).isTrue(); |
| }); |
| |
| thread.start(); |
| ThreadUtils.join(thread, 30 * 1000); |
| System.out.println("main thread about to suspendLocking"); |
| assertThat(!service.suspendLocking(1000)).isTrue(); |
| } |
| |
| /** |
| * Test that exlusive locking prohibits locking activity |
| */ |
| @Test |
| public void testSuspendLockingProhibitsLocking() { |
| final String name = getUniqueName(); |
| distributedCreateService(2, name, true); |
| DistributedLockService service = DistributedLockService.getServiceNamed(name); |
| |
| // Should be able to lock from other VM |
| VM vm1 = VM.getVM(1); |
| assertThat(vm1.invoke(() -> DistributedLockServiceDUnitTest.tryToLock(name))).isTrue(); |
| |
| assertThat(service.suspendLocking(1000)).isTrue(); |
| |
| // vm1 is the grantor... use debugHandleSuspendTimeouts |
| vm1.invoke(new SerializableRunnable("setDebugHandleSuspendTimeouts") { |
| @Override |
| public void run() { |
| DLockService dls = (DLockService) DistributedLockService.getServiceNamed(name); |
| assertThat(dls.isLockGrantor()).isTrue(); |
| DLockGrantor grantor = dls.getGrantorWithNoSync(); |
| grantor.setDebugHandleSuspendTimeouts(5000); |
| } |
| }); |
| |
| // Shouldn't be able to lock a name from another VM |
| assertThat(!vm1.invoke(() -> DistributedLockServiceDUnitTest.tryToLock(name))).isTrue(); |
| |
| service.resumeLocking(); |
| |
| vm1.invoke(new SerializableRunnable("unsetDebugHandleSuspendTimeouts") { |
| @Override |
| public void run() { |
| DLockService dls = (DLockService) DistributedLockService.getServiceNamed(name); |
| assertThat(dls.isLockGrantor()).isTrue(); |
| DLockGrantor grantor = dls.getGrantorWithNoSync(); |
| grantor.setDebugHandleSuspendTimeouts(0); |
| } |
| }); |
| |
| // Should be able to lock again |
| assertThat(vm1.invoke(() -> DistributedLockServiceDUnitTest.tryToLock(name))).isTrue(); |
| |
| } |
| |
| /** |
| * Test that suspend locking behaves under various usage patterns. This ensures that suspend and |
| * regular locks behave as ReadWriteLocks and processing occurs in order. |
| */ |
| @Test |
| public void testSuspendLockingBehaves() throws Exception { |
| try { |
| doTestSuspendLockingBehaves(); |
| } finally { |
| Invoke.invokeInEveryVM(new SerializableRunnable() { |
| @Override |
| public void run() { |
| try { |
| if (suspendClientSuspendLockingBehaves != null) { |
| suspendClientSuspendLockingBehaves.stop(); |
| suspendClientSuspendLockingBehaves = null; |
| } |
| } catch (VirtualMachineError e) { |
| SystemFailure.initiateFailure(e); |
| throw e; |
| } catch (Throwable t) { |
| logger.error("Error in testSuspendLockingBehaves finally", t); |
| } |
| try { |
| if (lockClientSuspendLockingBehaves != null) { |
| lockClientSuspendLockingBehaves.stop(); |
| lockClientSuspendLockingBehaves = null; |
| } |
| } catch (VirtualMachineError e) { |
| SystemFailure.initiateFailure(e); |
| throw e; |
| } catch (Throwable t) { |
| logger.error("Error in testSuspendLockingBehaves finally", t); |
| } |
| } |
| }); |
| } |
| } |
| |
| private void doTestSuspendLockingBehaves() { |
| final String dlsName = getUniqueName(); |
| final VM vmGrantor = VM.getVM(0); |
| final VM vmOne = VM.getVM(1); |
| final VM vmTwo = VM.getVM(2); |
| final VM vmThree = VM.getVM(3); |
| final String key1 = "key1"; |
| |
| // TODO: make sure suspend thread can get other locks |
| |
| // TODO: test local (in grantor) locks and suspends also |
| |
| // define some SerializableRunnables |
| final SerializableRunnable createDLS = new SerializableRunnable("Create " + dlsName) { |
| @Override |
| public void run() { |
| DistributedLockService.create(dlsName, getSystem()); |
| lockClientSuspendLockingBehaves = new BasicLockClient(dlsName, key1); |
| suspendClientSuspendLockingBehaves = new BasicLockClient(dlsName, key1); |
| assertThat(isLockGrantor(dlsName)).isFalse(); |
| } |
| }; |
| final SerializableRunnable suspendLocking = |
| new SerializableRunnable("Suspend locking " + dlsName) { |
| @Override |
| public void run() { |
| suspendClientSuspendLockingBehaves.suspend(); |
| } |
| }; |
| final SerializableRunnable resumeLocking = |
| new SerializableRunnable("Resume locking " + dlsName) { |
| @Override |
| public void run() { |
| suspendClientSuspendLockingBehaves.resume(); |
| } |
| }; |
| final SerializableRunnable lockKey = new SerializableRunnable("Get lock " + dlsName) { |
| @Override |
| public void run() { |
| lockClientSuspendLockingBehaves.lock(); |
| } |
| }; |
| final SerializableRunnable unlockKey = new SerializableRunnable("Unlock " + dlsName) { |
| @Override |
| public void run() { |
| lockClientSuspendLockingBehaves.unlock(); |
| } |
| }; |
| |
| // create grantor |
| logger.info("[testSuspendLockingBehaves] Create grantor " + dlsName); |
| vmGrantor.invoke(new SerializableRunnable("Create grantor " + dlsName) { |
| @Override |
| public void run() { |
| DistributedLockService.create(dlsName, getSystem()); |
| DistributedLockService.getServiceNamed(dlsName).lock(key1, -1, -1); |
| DistributedLockService.getServiceNamed(dlsName).unlock(key1); |
| assertThat(isLockGrantor(dlsName)).isTrue(); |
| } |
| }); |
| |
| // create dls in other vms |
| vmOne.invoke(createDLS); |
| vmTwo.invoke(createDLS); |
| vmThree.invoke(createDLS); |
| |
| // get a lock |
| logger.info("[testSuspendLockingBehaves] line up vms for lock"); |
| vmOne.invoke(lockKey); |
| AsyncInvocation vmTwoLocking = vmTwo.invokeAsync(lockKey); |
| Wait.pause(2000); // make sure vmTwo is first in line |
| AsyncInvocation vmThreeLocking = vmThree.invokeAsync(lockKey); |
| Wait.pause(2000); |
| |
| // make sure vmTwo and vmThree are still waiting for lock on key1 |
| Wait.pause(100); |
| assertThat(vmTwoLocking.isAlive()).isTrue(); |
| Wait.pause(100); |
| assertThat(vmThreeLocking.isAlive()).isTrue(); |
| |
| // let vmTwo get key |
| logger.info("[testSuspendLockingBehaves] unlock so vmTwo can get key"); |
| vmOne.invoke(unlockKey); |
| ThreadUtils.join(vmTwoLocking, 10 * 1000); |
| |
| // start suspending in vmOne and vmTwo |
| logger.info("[testSuspendLockingBehaves] start suspending requests"); |
| AsyncInvocation vmOneSuspending = vmOne.invokeAsync(suspendLocking); |
| Wait.pause(2000); // make sure vmOne is first in line |
| AsyncInvocation vmTwoSuspending = vmTwo.invokeAsync(suspendLocking); |
| Wait.pause(2000); |
| |
| // let vmThree finish locking key |
| logger.info("[testSuspendLockingBehaves] unlock so vmThree can get key"); |
| vmTwo.invoke(unlockKey); |
| ThreadUtils.join(vmThreeLocking, 10 * 1000); |
| |
| // have vmOne get back in line for locking key |
| logger.info("[testSuspendLockingBehaves] start another lock request"); |
| AsyncInvocation vmOneLockingAgain = vmOne.invokeAsync(lockKey); |
| Wait.pause(2000); |
| |
| // let vmOne suspend locking |
| logger.info("[testSuspendLockingBehaves] let vmOne suspend locking"); |
| Wait.pause(100); |
| assertThat(vmOneSuspending.isAlive()).isTrue(); |
| vmThree.invoke(unlockKey); |
| ThreadUtils.join(vmOneSuspending, 10 * 1000); |
| |
| // start suspending in vmThree |
| logger |
| .info("[testSuspendLockingBehaves] line up vmThree for suspending"); |
| AsyncInvocation vmThreeSuspending = vmThree.invokeAsync(suspendLocking); |
| Wait.pause(2000); |
| |
| // let vmTwo suspend locking |
| logger.info("[testSuspendLockingBehaves] let vmTwo suspend locking"); |
| Wait.pause(100); |
| assertThat(vmTwoSuspending.isAlive()).isTrue(); |
| vmOne.invoke(resumeLocking); |
| ThreadUtils.join(vmTwoSuspending, 10 * 1000); |
| |
| // let vmOne get that lock |
| logger.info("[testSuspendLockingBehaves] let vmOne get that lock"); |
| Wait.pause(100); |
| assertThat(vmOneLockingAgain.isAlive()).isTrue(); |
| vmTwo.invoke(resumeLocking); |
| ThreadUtils.join(vmOneLockingAgain, 10 * 1000); |
| |
| // let vmThree suspend locking |
| logger.info("[testSuspendLockingBehaves] let vmThree suspend locking"); |
| Wait.pause(100); |
| assertThat(vmThreeSuspending.isAlive()).isTrue(); |
| vmOne.invoke(unlockKey); |
| ThreadUtils.join(vmThreeSuspending, 10 * 1000); |
| |
| // done |
| vmThree.invoke(resumeLocking); |
| } |
| |
| private static BasicLockClient suspendClientSuspendLockingBehaves; |
| private static BasicLockClient lockClientSuspendLockingBehaves; |
| |
| /** |
| * Test that exlusive locking prohibits locking activity |
| */ |
| @Test |
| public void testSuspendLockingBlocksUntilNoLocks() throws InterruptedException { |
| |
| final String name = getUniqueName(); |
| distributedCreateService(2, name, true); |
| final DistributedLockService service = getServiceNamed(name); |
| |
| // Get lock from other VM. Since same thread needs to lock and unlock, |
| // invoke asynchronously, get lock, wait to be notified, then unlock. |
| VM vm1 = getVM(1); |
| AsyncInvocation asyncInvocation = |
| vm1.invokeAsync(new SerializableRunnable("Lock & unlock in vm1") { |
| @Override |
| public void run() { |
| DistributedLockService service2 = getServiceNamed(name); |
| assertThat(service2.lock("lock", -1, -1)).isTrue(); |
| synchronized (monitor) { |
| try { |
| monitor.wait(); |
| } catch (InterruptedException ex) { |
| out.println("Unexpected InterruptedException"); |
| fail("interrupted"); |
| } |
| } |
| service2.unlock("lock"); |
| } |
| }); |
| // Let vm1's thread get the lock and go into wait() |
| await().untilAsserted(() -> assertThat(asyncInvocation.isAlive()).isTrue()); |
| |
| Thread thread = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| setGot(service.suspendLocking(-1)); |
| setDone(true); |
| service.resumeLocking(); |
| } |
| }); |
| setGot(false); |
| setDone(false); |
| thread.start(); |
| |
| // Let thread start, make sure it's blocked in suspendLocking |
| await().untilAsserted(() -> assertThat(thread.isAlive()).isTrue()); |
| assertThat(getGot() || getDone()) |
| .withFailMessage("Before release, got: " + getGot() + ", done: " + getDone()).isFalse(); |
| |
| vm1.invoke(new SerializableRunnable("notify vm1 to unlock") { |
| @Override |
| public void run() { |
| synchronized (monitor) { |
| monitor.notify(); |
| } |
| } |
| }); |
| |
| // Let thread finish, make sure it successfully suspended and is done |
| WaitCriterion ev = new WaitCriterion() { |
| @Override |
| public boolean done() { |
| return getDone(); |
| } |
| |
| @Override |
| public String description() { |
| return null; |
| } |
| }; |
| GeodeAwaitility.await().untilAsserted(ev); |
| if (!getGot() || !getDone()) { |
| dumpAllStacks(); |
| } |
| assertThat(getGot() && getDone()) |
| .withFailMessage("After release, got: " + getGot() + ", done: " + getDone()).isTrue(); |
| |
| } |
| |
| @Test |
| public void testSuspendLockingInterruptiblyIsInterruptible() throws Exception { |
| |
| started = false; |
| gotLock = false; |
| exception = null; |
| |
| // Lock entire service in first thread |
| final String name = getUniqueName(); |
| final DistributedLockService service = DistributedLockService.create(name, dlstSystem); |
| assertThat(service.suspendLocking(1000)).isTrue(); |
| |
| // Start second thread that tries to lock in second thread |
| Thread thread2 = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| started = true; |
| gotLock = service.suspendLockingInterruptibly(-1); |
| } catch (InterruptedException ex) { |
| exception = ex; |
| } |
| } |
| }); |
| thread2.start(); |
| |
| // Interrupt second thread |
| while (!started) |
| Thread.yield(); |
| thread2.interrupt(); |
| ThreadUtils.join(thread2, 20 * 1000); |
| |
| // Expect it got InterruptedException and didn't lock the service |
| Thread.sleep(500); |
| assertThat(gotLock).isFalse(); |
| assertThat(exception).isNotNull(); |
| |
| // Unlock entire service in first thread |
| service.resumeLocking(); |
| Thread.sleep(500); |
| |
| // Make sure it didn't get locked by second thread |
| assertThat(service.suspendLocking(1000)).isTrue(); |
| DistributedLockService.destroy(name); |
| } |
| |
| @Test |
| public void testSuspendLockingIsNotInterruptible() throws Exception { |
| |
| started = false; |
| gotLock = false; |
| exception = null; |
| wasFlagSet = false; |
| |
| // Lock entire service in first thread |
| final String name = getUniqueName(); |
| final DistributedLockService service = DistributedLockService.create(name, dlstSystem); |
| assertThat(service.suspendLocking(1000)).isTrue(); |
| |
| // Start second thread that tries to lock in second thread |
| Thread thread2 = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| started = true; |
| gotLock = service.suspendLocking(-1); |
| } catch (VirtualMachineError e) { |
| SystemFailure.initiateFailure(e); |
| throw e; |
| } catch (Throwable ex) { |
| exception = ex; |
| } |
| wasFlagSet = Thread.currentThread().isInterrupted(); |
| } |
| }); |
| thread2.start(); |
| |
| // Interrupt second thread |
| while (!started) |
| Thread.yield(); |
| thread2.interrupt(); |
| // Expect it didn't get an exception and didn't lock the service |
| Thread.sleep(500); |
| assertThat(gotLock).isFalse(); |
| assertThat(exception).isNull(); |
| |
| // Unlock entire service in first thread |
| service.resumeLocking(); |
| ThreadUtils.join(thread2, 20 * 1000); |
| |
| // Now thread2 should have gotten the lock, not the exception, but the |
| // thread's flag should be set |
| logger.info("[testSuspendLockingIsNotInterruptible]" + " gotLock=" |
| + gotLock + " wasFlagSet=" + wasFlagSet + " exception=" + exception, exception); |
| assertThat(gotLock).isTrue(); |
| assertThat(exception).isNull(); |
| assertThat(wasFlagSet).isTrue(); |
| } |
| |
| /** |
| * Tests what happens when you attempt to lock a name on a lock service that has been destroyed. |
| * |
| * @author David Whitlock |
| */ |
| @Test |
| public void testLockDestroyedService() { |
| String serviceName = this.getUniqueName(); |
| DistributedLockService service = DistributedLockService.create(serviceName, dlstSystem); |
| DistributedLockService.destroy(serviceName); |
| try { |
| boolean locked = service.lock("TEST", -1, -1); |
| fail("Lock of destroyed service returned: " + locked); |
| |
| } catch (LockServiceDestroyedException ex) { |
| // pass... |
| } |
| } |
| |
| @Test |
| public void testDepartedLastOwnerWithLease() { |
| final String serviceName = this.getUniqueName(); |
| |
| // Create service in this VM |
| DistributedLockService service = DistributedLockService.create(serviceName, dlstSystem); |
| assertThat(service.lock("key", -1, -1)).isTrue(); |
| service.unlock("key"); |
| |
| // Create service in other VM |
| VM otherVm = VM.getVM(0); |
| otherVm.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| DistributedLockService service2 = DistributedLockService.create(serviceName, dlstSystem); |
| service2.lock("key", -1, 360000); |
| service2.unlock("key"); |
| // Wait for asynchronous messaging to complete |
| try { |
| Thread.sleep(100); |
| } catch (InterruptedException ex) { |
| fail("interrupted"); |
| } |
| disconnectFromDS(); |
| } |
| }); |
| |
| // Now lock back in this VM |
| assertThat(service.lock("key", -1, -1)).isTrue(); |
| |
| } |
| |
| @Test |
| public void testDepartedLastOwnerNoLease() { |
| final String serviceName = this.getUniqueName(); |
| |
| // Create service in this VM |
| DistributedLockService service = DistributedLockService.create(serviceName, dlstSystem); |
| assertThat(service.lock("key", -1, -1)).isTrue(); |
| service.unlock("key"); |
| |
| // Create service in other VM |
| VM otherVm = VM.getVM(0); |
| otherVm.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| DistributedLockService service2 = DistributedLockService.create(serviceName, dlstSystem); |
| service2.lock("key", -1, -1); |
| service2.unlock("key"); |
| // Wait for asynchronous messaging to complete |
| try { |
| Thread.sleep(100); |
| } catch (InterruptedException ex) { |
| fail("interrupted"); |
| } |
| disconnectFromDS(); |
| } |
| }); |
| |
| // Now lock back in this VM |
| assertThat(service.lock("key", -1, -1)).isTrue(); |
| |
| } |
| |
| /** |
| * Tests for 32461 R3 StuckLocks can occur on locks with an expiration lease |
| * <p> |
| * VM-A locks/unlocks "lock", VM-B leases "lock" and disconnects, VM-C attempts to lock "lock" and |
| * old dlock throws StuckLockException. VM-C should now succeed in acquiring the lock. |
| */ |
| @Test |
| public void testBug32461() { |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testBug32461] prepping"); |
| } |
| |
| final String serviceName = getUniqueName(); |
| final Object objName = "32461"; |
| final int VM_A = 0; |
| final int VM_B = 1; |
| final int VM_C = 2; |
| |
| // VM-A locks/unlocks "lock"... |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testBug32461] VM-A locks/unlocks '32461'"); |
| } |
| |
| VM.getVM(VM_A).invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| remoteCreateService(serviceName); |
| final DistributedLockService service = DistributedLockService.getServiceNamed(serviceName); |
| assertThat(service.lock(objName, -1, Long.MAX_VALUE)).isTrue(); |
| service.unlock(objName); |
| } |
| }); |
| |
| // VM-B leases "lock" and disconnects, |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testBug32461] VM_B leases '32461' and disconnects"); |
| } |
| |
| VM.getVM(VM_B).invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| remoteCreateService(serviceName); |
| final DistributedLockService service = DistributedLockService.getServiceNamed(serviceName); |
| assertThat(service.lock(objName, -1, Long.MAX_VALUE)).isTrue(); |
| DistributedLockService.destroy(serviceName); |
| disconnectFromDS(); |
| } |
| }); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("[testBug32461] VM_C attempts to lock '32461'"); |
| } |
| |
| VM.getVM(VM_C).invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| remoteCreateService(serviceName); |
| final DistributedLockService service = DistributedLockService.getServiceNamed(serviceName); |
| assertThat(service.lock(objName, -1, -1)).isTrue(); |
| service.unlock(objName); |
| } |
| }); |
| } |
| |
| @Test |
| public void testNoStuckLock() { |
| final String serviceName = this.getUniqueName(); |
| final Object keyWithLease = "key-with-lease"; |
| final Object keyNoLease = "key-no-lease"; |
| |
| // Create service in this VM |
| DistributedLockService service = DistributedLockService.create(serviceName, dlstSystem); |
| |
| assertThat(service.lock(keyWithLease, -1, -1)).isTrue(); |
| service.unlock(keyWithLease); |
| |
| assertThat(service.lock(keyNoLease, -1, -1)).isTrue(); |
| service.unlock(keyNoLease); |
| |
| // Create service in other VM |
| VM otherVm = VM.getVM(0); |
| otherVm.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| DistributedLockService service2 = DistributedLockService.create(serviceName, dlstSystem); |
| service2.lock(keyWithLease, -1, 360000); |
| service2.lock(keyNoLease, -1, -1); |
| disconnectFromDS(); |
| } |
| }); |
| |
| // Now lock back in this VM... no stuck locks anymore |
| assertThat(service.lock(keyWithLease, -1, -1)).isTrue(); |
| service.unlock(keyWithLease); |
| assertThat(service.lock(keyNoLease, -1, -1)).isTrue(); |
| service.unlock(keyNoLease); |
| } |
| |
| volatile boolean startedThread1_testReleaseOrphanedGrant; |
| volatile boolean releaseThread1_testReleaseOrphanedGrant; |
| volatile boolean startedThread2_testReleaseOrphanedGrant; |
| volatile boolean gotLockThread2_testReleaseOrphanedGrant; |
| |
| /** |
| * Client requests lock and then interrupts lock request before processing the grant reply. This |
| * causes the Client to send a release msg to the grantor. |
| */ |
| @Test |
| public void testReleaseOrphanedGrant_Local() throws Exception { |
| DLockRequestProcessor.setDebugReleaseOrphanedGrant(true); |
| DLockRequestProcessor.setWaitToProcessDLockResponse(false); |
| try { |
| startedThread2_testReleaseOrphanedGrant = false; |
| gotLockThread2_testReleaseOrphanedGrant = false; |
| releaseThread1_testReleaseOrphanedGrant = false; |
| |
| logger.info("[testReleaseOrphanedGrant_Local] create lock service"); |
| final String serviceName = getUniqueName(); |
| final DistributedLockService service = DistributedLockService.create(serviceName, dlstSystem); |
| |
| // thread to get lock and wait and then unlock |
| final Thread thread1 = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| logger.info("[testReleaseOrphanedGrant_Local] get the lock"); |
| assertThat(service.lock("obj", -1, -1)).isTrue(); |
| DLockRequestProcessor.setWaitToProcessDLockResponse(true); |
| startedThread1_testReleaseOrphanedGrant = true; |
| synchronized (Thread.currentThread()) { |
| while (!releaseThread1_testReleaseOrphanedGrant) { |
| try { |
| Thread.currentThread().wait(); |
| } catch (InterruptedException ignore) { |
| fail("interrupted"); |
| } |
| } |
| } |
| logger.info("[testReleaseOrphanedGrant_Local] unlock the lock"); |
| service.unlock("obj"); |
| } |
| }); |
| thread1.start(); |
| while (!startedThread1_testReleaseOrphanedGrant) { |
| Thread.yield(); |
| } |
| |
| // thread to interrupt lockInterruptibly call to cause zombie grant |
| final Thread thread2 = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| logger |
| .info("[testReleaseOrphanedGrant_Local] call lockInterruptibly"); |
| startedThread2_testReleaseOrphanedGrant = true; |
| assertThat(service.lockInterruptibly("obj", -1, -1)).isFalse(); |
| } catch (InterruptedException expected) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| }); |
| thread2.start(); |
| while (!startedThread2_testReleaseOrphanedGrant) { |
| Thread.yield(); |
| } |
| |
| // release first thread to unlock |
| logger.info("[testReleaseOrphanedGrant_Local] release 1st thread"); |
| Thread.sleep(500); |
| synchronized (thread1) { |
| releaseThread1_testReleaseOrphanedGrant = true; |
| thread1.notifyAll(); |
| } |
| Thread.sleep(500); |
| |
| // while first thread is stuck on waitToProcessDLockResponse, |
| // interrupt 2nd thread |
| logger.info("[testReleaseOrphanedGrant_Local] interrupt 2nd thread"); |
| thread2.interrupt(); |
| ThreadUtils.join(thread2, 20 * 1000); |
| |
| // release waitToProcessDLockResponse |
| logger.info("[testReleaseOrphanedGrant_Local] process lock response"); |
| Thread.sleep(500); |
| DLockRequestProcessor.setWaitToProcessDLockResponse(false); |
| |
| // relock obj to make sure zombie release worked |
| logger.info("[testReleaseOrphanedGrant_Local] verify lock not held"); |
| assertThat(service.lock("obj", 1000, -1)).isTrue(); |
| } finally { |
| DLockRequestProcessor.setDebugReleaseOrphanedGrant(false); |
| DLockRequestProcessor.setWaitToProcessDLockResponse(false); |
| } |
| } |
| |
| private static volatile Thread threadVM1_testReleaseOrphanedGrant_Remote; |
| private static volatile Thread threadVM2_testReleaseOrphanedGrant_Remote; |
| private static volatile boolean startedThreadVM1_testReleaseOrphanedGrant_Remote; |
| private static volatile boolean releaseThreadVM1_testReleaseOrphanedGrant_Remote; |
| private static volatile boolean unlockedThreadVM1_testReleaseOrphanedGrant_Remote; |
| private static volatile boolean startedThreadVM2_testReleaseOrphanedGrant_Remote; |
| |
| @Test |
| public void testReleaseOrphanedGrant_Remote() throws Exception { |
| doTestReleaseOrphanedGrant_Remote(false); |
| } |
| |
| @Test |
| public void testReleaseOrphanedGrant_RemoteWithDestroy() throws Exception { |
| doTestReleaseOrphanedGrant_Remote(true); |
| } |
| |
| private void doTestReleaseOrphanedGrant_Remote(final boolean destroyLockService) |
| throws InterruptedException { |
| final VM vm1 = VM.getVM(0); |
| final VM vm2 = VM.getVM(1); |
| |
| try { |
| logger.info("[testReleaseOrphanedGrant_Remote] create lock service"); |
| final String serviceName = getUniqueName(); |
| final DistributedLockService service = DistributedLockService.create(serviceName, dlstSystem); |
| |
| // lock and unlock to make sure this vm is grantor |
| assertThat(service.lock("obj", -1, -1)).isTrue(); |
| service.unlock("obj"); |
| |
| // thread to get lock and wait and then unlock |
| vm1.invokeAsync(new SerializableRunnable() { |
| @Override |
| public void run() { |
| logger.info("[testReleaseOrphanedGrant_Remote] get the lock"); |
| threadVM1_testReleaseOrphanedGrant_Remote = Thread.currentThread(); |
| connectDistributedSystem(); |
| DistributedLockService service_vm1 = |
| DistributedLockService.create(serviceName, getSystem()); |
| assertThat(service_vm1.lock("obj", -1, -1)).isTrue(); |
| synchronized (threadVM1_testReleaseOrphanedGrant_Remote) { |
| while (!releaseThreadVM1_testReleaseOrphanedGrant_Remote) { |
| try { |
| startedThreadVM1_testReleaseOrphanedGrant_Remote = true; |
| Thread.currentThread().wait(); |
| } catch (InterruptedException ignore) { |
| fail("interrupted"); |
| } |
| } |
| } |
| logger.info("[testReleaseOrphanedGrant_Remote] unlock the lock"); |
| service_vm1.unlock("obj"); |
| unlockedThreadVM1_testReleaseOrphanedGrant_Remote = true; |
| } |
| }); |
| vm1.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| while (!startedThreadVM1_testReleaseOrphanedGrant_Remote) { |
| Thread.yield(); |
| } |
| } |
| }); |
| Thread.sleep(500); |
| |
| // thread to interrupt lockInterruptibly call to cause zombie grant |
| vm2.invokeAsync(new SerializableRunnable() { |
| @Override |
| public void run() { |
| logger |
| .info("[testReleaseOrphanedGrant_Remote] call lockInterruptibly"); |
| threadVM2_testReleaseOrphanedGrant_Remote = Thread.currentThread(); |
| DistributedLockService service_vm2 = |
| DistributedLockService.create(serviceName, getSystem()); |
| startedThreadVM2_testReleaseOrphanedGrant_Remote = true; |
| try { |
| DLockRequestProcessor.setDebugReleaseOrphanedGrant(true); |
| DLockRequestProcessor.setWaitToProcessDLockResponse(true); |
| assertThat(service_vm2.lockInterruptibly("obj", -1, -1)).isFalse(); |
| } catch (InterruptedException expected) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| }); |
| vm2.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| while (!startedThreadVM2_testReleaseOrphanedGrant_Remote) { |
| Thread.yield(); |
| } |
| } |
| }); |
| Thread.sleep(500); |
| |
| // release first thread to unlock |
| vm1.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| logger |
| .info("[testReleaseOrphanedGrant_Remote] release 1st thread"); |
| synchronized (threadVM1_testReleaseOrphanedGrant_Remote) { |
| releaseThreadVM1_testReleaseOrphanedGrant_Remote = true; |
| threadVM1_testReleaseOrphanedGrant_Remote.notifyAll(); |
| } |
| } |
| }); |
| Thread.sleep(500); // lock is being released, grantor will grant lock to vm2 |
| |
| // while first thread is stuck on waitToProcessDLockResponse, |
| // interrupt 2nd thread |
| vm2.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| logger |
| .info("[testReleaseOrphanedGrant_Remote] interrupt 2nd thread"); |
| threadVM2_testReleaseOrphanedGrant_Remote.interrupt(); |
| ThreadUtils.join(threadVM2_testReleaseOrphanedGrant_Remote, 5 * 60 * 1000); |
| if (destroyLockService) { |
| logger |
| .info("[testReleaseOrphanedGrant_Remote] destroy lock service"); |
| DistributedLockService.destroy(serviceName); |
| assertThat(DistributedLockService.getServiceNamed(serviceName)).isNull(); |
| } |
| } |
| }); |
| Thread.sleep(500); // grant is blocked while reply processor is being destroyed |
| |
| // release waitToProcessDLockResponse |
| vm2.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| logger |
| .info("[testReleaseOrphanedGrant_Remote] process lock response"); |
| DLockRequestProcessor.setWaitToProcessDLockResponse(false); |
| } |
| }); |
| Thread.sleep(500); // process grant and send zombie release to grantor |
| |
| // relock obj to make sure zombie release worked |
| logger.info("[testReleaseOrphanedGrant_Remote] verify lock not held"); |
| assertThat(service.lock("obj", 1000, -1)).isTrue(); |
| } finally { |
| vm2.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| logger |
| .info("[testReleaseOrphanedGrant_Remote] clean up DebugReleaseOrphanedGrant"); |
| DLockRequestProcessor.setDebugReleaseOrphanedGrant(false); |
| DLockRequestProcessor.setWaitToProcessDLockResponse(false); |
| } |
| }); |
| } |
| } |
| |
| @Test |
| public void testDestroyLockServiceAfterGrantResponse() { |
| VM vm0 = VM.getVM(0); |
| |
| final String serviceName = getUniqueName(); |
| |
| vm0.invoke(() -> createLockGrantor(serviceName)); |
| |
| DistributionMessageObserver.setInstance(new DistributionMessageObserver() { |
| |
| @Override |
| public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) { |
| if (message instanceof DLockResponseMessage) { |
| DistributedLockService.destroy(serviceName); |
| } |
| } |
| }); |
| |
| connectDistributedSystem(); |
| final DistributedLockService service = DistributedLockService.create(serviceName, dlstSystem); |
| try { |
| service.lock("obj", -1, -1); |
| fail("The lock service should have been destroyed"); |
| } catch (LockServiceDestroyedException expected) { |
| // Do nothing |
| } |
| |
| vm0.invoke(new SerializableRunnable("check to make sure the lock is not orphaned") { |
| |
| @Override |
| public void run() { |
| final DistributedLockService service = DistributedLockService.getServiceNamed(serviceName); |
| |
| // lock and unlock to make sure this vm is grantor |
| assertThat(service.lock("obj", -1, -1)).isTrue(); |
| service.unlock("obj"); |
| } |
| }); |
| } |
| |
| @Test |
| public void testDestroyLockServiceBeforeGrantRequest() { |
| VM vm0 = VM.getVM(0); |
| |
| final String serviceName = getUniqueName(); |
| |
| vm0.invoke(() -> createLockGrantor(serviceName)); |
| |
| DistributionMessageObserver.setInstance(new DistributionMessageObserver() { |
| |
| @Override |
| public void beforeSendMessage(ClusterDistributionManager dm, DistributionMessage message) { |
| if (message instanceof DLockRequestMessage) { |
| DistributedLockService.destroy(serviceName); |
| } |
| } |
| }); |
| |
| connectDistributedSystem(); |
| final DistributedLockService service = DistributedLockService.create(serviceName, dlstSystem); |
| try { |
| service.lock("obj", -1, -1); |
| fail("The lock service should have been destroyed"); |
| } catch (LockServiceDestroyedException expected) { |
| // Do nothing |
| } |
| |
| vm0.invoke(new SerializableRunnable("check to make sure the lock is not orphaned") { |
| |
| @Override |
| public void run() { |
| final DistributedLockService service = DistributedLockService.getServiceNamed(serviceName); |
| |
| // lock and unlock to make sure this vm is grantor |
| assertThat(service.lock("obj", -1, -1)).isTrue(); |
| service.unlock("obj"); |
| } |
| }); |
| } |
| |
| private static void createLockGrantor(String serviceName) { |
| connectDistributedSystem(); |
| final DistributedLockService service = |
| DistributedLockService.create(serviceName, dlstSystem); |
| |
| // lock and unlock to make sure this vm is grantor |
| // implementation detail: as long as this VM gets the first lock, it will be the grantor |
| assertThat(service.lock("obj", -1, -1)).isTrue(); |
| service.unlock("obj"); |
| } |
| |
| ////////// Private test methods |
| |
| protected synchronized boolean getDone() { |
| return done; |
| } |
| |
| protected synchronized void setDone(boolean done) { |
| this.done = done; |
| } |
| |
| private synchronized boolean getGot() { |
| return got; |
| } |
| |
| protected synchronized void setGot(boolean got) { |
| this.got = got; |
| } |
| |
| protected static Boolean lock(String serviceName, Object name) { |
| DistributedLockService service = DistributedLockService.getServiceNamed(serviceName); |
| boolean locked = service.lock(name, 1000, -1); |
| return locked; |
| } |
| |
| protected static Boolean tryLock(String serviceName, Object name, Long wait) { |
| DLockService service = DLockService.getInternalServiceNamed(serviceName); |
| boolean locked = service.lock(name, wait, -1, true); |
| return locked; |
| } |
| |
| protected static Boolean unlock(String serviceName, Object name) { |
| DistributedLockService service = DistributedLockService.getServiceNamed(serviceName); |
| try { |
| service.unlock(name); |
| return Boolean.TRUE; |
| } catch (LockNotHeldException e) { |
| return Boolean.FALSE; |
| } catch (Exception e) { |
| e.printStackTrace(); |
| return Boolean.FALSE; |
| } |
| } |
| |
| private static Boolean tryToLock(String serviceName) { |
| DistributedLockService service = DistributedLockService.getServiceNamed(serviceName); |
| boolean locked = service.lock("obj", 1000, -1); |
| if (locked) { |
| service.unlock("obj"); |
| } |
| return locked; |
| } |
| |
| private void doOneGetsAndOthersTimeOut(int numVMs, int numThreadsPerVM) throws Exception { |
| |
| final String serviceName = getUniqueName() + "-" + numVMs + "-" + numThreadsPerVM; |
| final String objectName = "obj"; |
| |
| System.out.println("Starting testtt " + serviceName); |
| |
| distributedCreateService(numVMs, serviceName, true); |
| hits = 0; |
| completes = 0; |
| blackboard.initCount(); |
| blackboard.setIsLocked(false); |
| |
| // tell them all to request a lock and increment |
| long timeout = 1000; |
| long holdTime = (timeout * 5); |
| final Host host = Host.getHost(0); |
| for (int vm = 0; vm < numVMs; vm++) { |
| final int finalvm = vm; |
| for (int thread = 0; thread < numThreadsPerVM; thread++) { |
| final int finalthread = thread; |
| (new Thread(() -> { |
| Boolean result = VM.getVM(finalvm) |
| .invoke(() -> DistributedLockServiceDUnitTest.getLockAndIncrement( |
| serviceName, objectName, timeout, holdTime)); |
| if (result) { |
| incHits(); |
| } |
| incCompletes(); |
| }, "doOneGetsAndOthersTimeOut-" + thread)).start(); |
| } |
| } |
| |
| // wait for timeout |
| // wait for completion or timeout |
| long start = System.currentTimeMillis(); |
| while ((completes < numVMs * numThreadsPerVM) |
| && (System.currentTimeMillis() - start < holdTime * 10)) { |
| Thread.sleep(200); |
| } |
| |
| // assert that only one got ownership |
| if (hits != 1) { |
| ThreadUtils.dumpAllStacks(); |
| } |
| assertThat(hits).isEqualTo(1) |
| .withFailMessage("number of VMs that got ownership is wrong"); |
| |
| // assert that all the others timed out |
| assertThat(completes).isEqualTo(numVMs * numThreadsPerVM) |
| .withFailMessage("number of threads that completed is wrong"); |
| |
| // Check final value of entry |
| long count = blackboard.getCount(); |
| assertThat(count).isEqualTo(1).withFailMessage("Final entry value wrong"); |
| |
| System.out.println("Done testtt " + serviceName); |
| } |
| |
| // 2, 2... expect 4, but get 3 |
| private void doOneGetsThenOtherGets(int numVMs, int numThreadsPerVM) throws Exception { |
| |
| final String serviceName = getUniqueName() + "-" + numVMs + "-" + numThreadsPerVM; |
| final String objectName = "obj"; |
| |
| System.out.println("Starting testtt " + serviceName); |
| |
| distributedCreateService(numVMs, serviceName, true); |
| hits = 0; |
| completes = 0; |
| blackboard.initCount(); |
| blackboard.setIsLocked(false); |
| |
| // tell all VMs to lock, long timeout, short hold time |
| // each one gets lock, increments and releases |
| final long timeout = Math.max(1000 * numVMs * numThreadsPerVM, 10 * 1000); // at least 10 |
| // seconds |
| |
| final Host host = Host.getHost(0); |
| for (int vm = 0; vm < numVMs; vm++) { |
| final int finalvm = vm; |
| for (int thread = 0; thread < numThreadsPerVM; thread++) { |
| final int finalthread = thread; |
| (new Thread(() -> { |
| |
| System.out.println("VM " + finalvm + ", thread " + finalthread + " in " + serviceName |
| + " about to invoke"); |
| |
| Boolean result = null; |
| try { |
| result = VM.getVM(finalvm) |
| .invoke(() -> DistributedLockServiceDUnitTest.getLockAndIncrement( |
| serviceName, objectName, timeout, 0L)); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| System.out.println("VM " + finalvm + ", thread " + finalthread + " in " + serviceName |
| + " got result " + result); |
| if (result) { |
| incHits(); |
| } |
| incCompletes(); |
| })).start(); |
| } |
| } |
| |
| // wait for completion or timeout |
| long start = System.currentTimeMillis(); |
| while (completes < numVMs * numThreadsPerVM) { |
| if (!(System.currentTimeMillis() - start < timeout * 2)) { |
| System.out.println("Test serviceName timed out"); |
| break; |
| } |
| Thread.sleep(200); |
| } |
| |
| // assert that all completed |
| assertThat(completes).isEqualTo(numVMs * numThreadsPerVM) |
| .withFailMessage("number of threads that completed is wrong"); |
| // ------------------------------------------------------- |
| |
| // assert that all were able to lock |
| if (hits != numVMs * numThreadsPerVM) { |
| ThreadUtils.dumpAllStacks(); |
| } |
| assertThat(hits).isEqualTo(numVMs * numThreadsPerVM) |
| .withFailMessage("number of VMs that got ownership is wrong"); |
| |
| // Check final value of entry |
| long count = blackboard.getCount(); |
| assertThat(count).isEqualTo(numVMs * numThreadsPerVM) |
| .withFailMessage("Blackboard.getCount() wrong"); |
| |
| System.out.println("Done testtt " + serviceName); |
| |
| } |
| |
| @Test |
| public void testTokenCleanup() { |
| final String dlsName = getUniqueName(); |
| |
| final VM vmGrantor = VM.getVM(0); |
| final VM vm1 = VM.getVM(1); |
| // final VM vm2 =VM.getVM(2); |
| |
| final String key1 = "key1"; |
| |
| // vmGrantor creates grantor |
| vmGrantor.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| logger.info("[testTokenCleanup] vmGrantor creates grantor"); |
| connectDistributedSystem(); |
| DLockService dls = (DLockService) DistributedLockService.create(dlsName, getSystem()); |
| |
| assertThat(dls.lock(key1, -1, -1)).isTrue(); |
| assertThat(dls.isLockGrantor()).isTrue(); |
| assertThat(dls.getToken(key1)).isNotNull(); |
| |
| dls.unlock(key1); |
| assertThat(dls.getToken(key1)).isNotNull(); |
| |
| // token should be removed when freeResources is called |
| dls.freeResources(key1); |
| // assertThat(dls.getToken(key1)).isNull(); |
| |
| DLockToken token = dls.getToken(key1); |
| assertThat(token).withFailMessage("Failed with bug 38180: " + token).isNull(); |
| |
| // make sure there are NO tokens at all |
| Collection tokens = dls.getTokens(); |
| assertThat(tokens.size()).isEqualTo(0) |
| .withFailMessage("Failed with bug 38180: tokens=" + tokens); |
| } |
| }); |
| |
| // vm1 locks and frees key1 |
| vm1.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| logger.info("[testTokenCleanup] vm1 locks key1"); |
| connectDistributedSystem(); |
| DLockService dls = (DLockService) DistributedLockService.create(dlsName, getSystem()); |
| |
| assertThat(dls.lock(key1, -1, -1)).isTrue(); |
| assertThat(dls.isLockGrantor()).isFalse(); |
| assertThat(dls.getToken(key1)).isNotNull(); |
| |
| dls.unlock(key1); |
| assertThat(dls.getToken(key1)).isNotNull(); |
| |
| dls.freeResources(key1); |
| // assertThat(dls.getToken(key1)).isNull(); |
| |
| DLockToken token = dls.getToken(key1); |
| assertThat(token).withFailMessage("Failed with bug 38180: " + token).isNull(); |
| |
| // make sure there are NO tokens at all |
| Collection tokens = dls.getTokens(); |
| assertThat(tokens.size()).isEqualTo(0) |
| .withFailMessage("Failed with bug 38180: tokens=" + tokens); |
| } |
| }); |
| |
| // vm1 tests recursion |
| vm1.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| logger.info("[testTokenCleanup] vm1 tests recursion"); |
| connectDistributedSystem(); |
| DLockService dls = (DLockService) DistributedLockService.getServiceNamed(dlsName); |
| |
| assertThat(dls.lock(key1, -1, -1)).isTrue(); // 1 |
| assertThat(dls.getToken(key1).getUsageCount()).isEqualTo(1); |
| assertThat(dls.lock(key1, -1, -1)).isTrue(); // 2 |
| assertThat(dls.getToken(key1).getUsageCount()).isEqualTo(2); |
| assertThat(dls.lock(key1, -1, -1)).isTrue(); // 3 |
| assertThat(dls.getToken(key1).getUsageCount()).isEqualTo(3); |
| |
| DLockToken token0 = dls.getToken(key1); |
| assertThat(token0).isNotNull(); |
| Collection tokens = dls.getTokens(); |
| assertThat(tokens.contains(token0)).isTrue(); |
| assertThat(tokens.size()).isEqualTo(1); |
| |
| dls.unlock(key1); // 1 |
| assertThat(dls.getToken(key1).getUsageCount()).isEqualTo(2); |
| dls.freeResources(key1); |
| |
| DLockToken token1 = dls.getToken(key1); |
| assertThat(token1).isNotNull(); |
| assertThat(token1).isEqualTo(token0); |
| tokens = dls.getTokens(); |
| assertThat(tokens.contains(token1)).isTrue(); |
| assertThat(tokens.size()).isEqualTo(1); |
| |
| dls.unlock(key1); // 2 |
| assertThat(dls.getToken(key1).getUsageCount()).isEqualTo(1); |
| dls.freeResources(key1); |
| assertThat(dls.getToken(key1)).isNotNull(); |
| |
| DLockToken token2 = dls.getToken(key1); |
| assertThat(token2).isNotNull(); |
| assertThat(token2).isEqualTo(token0); |
| tokens = dls.getTokens(); |
| assertThat(tokens.contains(token2)).isTrue(); |
| assertThat(tokens.size()).isEqualTo(1); |
| |
| dls.unlock(key1); // 3 |
| assertThat(dls.getToken(key1).getUsageCount()).isEqualTo(0); |
| dls.freeResources(key1); |
| |
| DLockToken token3 = dls.getToken(key1); |
| assertThat(token3).withFailMessage("Failed with bug 38180: " + token3).isNull(); |
| |
| // make sure there are NO tokens at all |
| tokens = dls.getTokens(); |
| assertThat(tokens.size()).isEqualTo(0) |
| .withFailMessage("Failed with bug 38180: tokens=" + tokens); |
| } |
| }); |
| } |
| |
| @Test |
| public void testGrantTokenCleanup() { |
| final String dlsName = getUniqueName(); |
| |
| final VM vmGrantor = VM.getVM(0); |
| final VM vm1 = VM.getVM(1); |
| |
| final String key1 = "key1"; |
| |
| // vmGrantor creates grantor |
| vmGrantor.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| logger.info("[testGrantTokenCleanup] vmGrantor creates grantor"); |
| connectDistributedSystem(); |
| DistributedLockService dls = DLockService.create(dlsName, getSystem(), true, true, true); |
| assertThat(dls.lock(key1, -1, -1)).isTrue(); |
| assertThat(dls.isLockGrantor()).isTrue(); |
| DLockGrantor grantor = ((DLockService) dls).getGrantor(); |
| assertThat(grantor).isNotNull(); |
| DLockGrantor.DLockGrantToken grantToken = grantor.getGrantToken(key1); |
| assertThat(grantToken).isNotNull(); |
| logger.info("[testGrantTokenCleanup] vmGrantor unlocks key1"); |
| dls.unlock(key1); |
| assertThat(grantor.getGrantToken(key1)).isNull(); |
| } |
| }); |
| |
| // vm1 locks and frees key1 |
| vm1.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| logger.info("[testTokenCleanup] vm1 locks key1"); |
| connectDistributedSystem(); |
| DLockService dls = |
| (DLockService) DLockService.create(dlsName, getSystem(), true, true, false); |
| assertThat(dls.lock(key1, -1, -1)).isTrue(); |
| |
| logger.info("[testTokenCleanup] vm1 frees key1"); |
| dls.unlock(key1); |
| |
| // Without automateFreeResources, token for key1 still exists until freeResources is called |
| assertThat(dls.getToken(key1)).isNotNull(); |
| dls.freeResources(key1); |
| |
| // make sure token for key1 is gone |
| DLockToken token = dls.getToken(key1); |
| assertThat(token).withFailMessage("token should have been cleaned up").isNull(); |
| |
| // make sure there are NO tokens at all |
| Collection tokens = dls.getTokens(); |
| assertThat(tokens.size()).isEqualTo(0) |
| .withFailMessage("There should be no tokens"); |
| } |
| }); |
| |
| // vmGrantor frees key1 |
| vmGrantor.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| logger.info("[testTokenCleanup] vmGrantor frees key1"); |
| DLockService dls = (DLockService) DistributedLockService.getServiceNamed(dlsName); |
| |
| // Because automateFreeResources is true, DLockToken and DLockGrantToken should have been |
| // removed when vm1 unlocked key1 |
| |
| // make sure token for key1 is gone |
| DLockToken token = dls.getToken(key1); |
| assertThat(token).withFailMessage("token should have been cleaned up").isNull(); |
| |
| // make sure there are NO tokens at all |
| Collection tokens = dls.getTokens(); |
| assertThat(tokens.size()).isEqualTo(0) |
| .withFailMessage("There should be no tokens"); |
| |
| // make sure there are NO grant tokens at all |
| DLockGrantor grantor = dls.getGrantor(); |
| Collection grantTokens = grantor.getGrantTokens(); |
| assertThat(grantTokens.size()).isEqualTo(0) |
| .withFailMessage("There should be no tokens"); |
| } |
| }); |
| } |
| |
| private static final AtomicBoolean testLockQuery_whileVM1Locks = new AtomicBoolean(); |
| |
| @Test |
| public void testLockQuery() { |
| final String dlsName = getUniqueName(); |
| |
| final VM vmGrantor = VM.getVM(0); |
| final VM vm1 = VM.getVM(1); |
| final VM vm2 = VM.getVM(2); |
| |
| final String key1 = "key1"; |
| |
| // vmGrantor creates grantor |
| vmGrantor.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| logger.info("[testLockQuery] vmGrantor creates grantor"); |
| connectDistributedSystem(); |
| DLockService dls = (DLockService) DistributedLockService.create(dlsName, getSystem()); |
| |
| assertThat(dls.lock(key1, -1, -1)).isTrue(); |
| assertThat(dls.isLockGrantor()).isTrue(); |
| dls.unlock(key1); |
| dls.freeResources(key1); |
| } |
| }); |
| |
| AsyncInvocation whileVM1Locks = null; |
| try { |
| // vm1 locks key1 |
| whileVM1Locks = vm1.invokeAsync(new SerializableRunnable() { |
| @Override |
| public void run() { |
| logger.info("[testLockQuery] vm1 locks key1"); |
| connectDistributedSystem(); |
| DLockService dls = (DLockService) DistributedLockService.create(dlsName, getSystem()); |
| |
| assertThat(dls.lock(key1, -1, -1)).isTrue(); |
| assertThat(dls.isLockGrantor()).isFalse(); |
| |
| try { |
| synchronized (testLockQuery_whileVM1Locks) { |
| testLockQuery_whileVM1Locks.set(true); |
| testLockQuery_whileVM1Locks.notifyAll(); |
| long maxWait = 10000; |
| StopWatch timer = new StopWatch(true); |
| while (testLockQuery_whileVM1Locks.get()) { // while true |
| long timeLeft = maxWait - timer.elapsedTimeMillis(); |
| if (timeLeft > 0) { |
| testLockQuery_whileVM1Locks.wait(timeLeft); |
| } else { |
| fail("Test attempted to wait too long"); |
| } |
| } |
| } |
| } catch (InterruptedException e) { |
| org.apache.geode.test.dunit.Assert.fail(e.getMessage(), e); |
| } |
| |
| logger.info("[testLockQuery] vm1 unlocks key1"); |
| dls.unlock(key1); |
| dls.freeResources(key1); |
| } |
| }); |
| |
| // wait for vm1 to set testLockQuery_whileVM1Locks |
| // get DistributedMember for vm1 |
| final DistributedMember vm1Member = vm1.invoke(() -> { |
| logger.info("[testLockQuery] vm1 waits for locking thread"); |
| synchronized (testLockQuery_whileVM1Locks) { |
| long maxWait = 10000; |
| StopWatch timer = new StopWatch(true); |
| while (!testLockQuery_whileVM1Locks.get()) { // while false |
| long timeLeft = maxWait - timer.elapsedTimeMillis(); |
| if (timeLeft > 0) { |
| testLockQuery_whileVM1Locks.wait(timeLeft); |
| } else { |
| fail("Test attempted to wait too long"); |
| } |
| } |
| } |
| return getSystem().getDistributedMember(); |
| }); |
| assertThat(vm1Member).isNotNull(); |
| |
| // vmGrantor tests positive local dlock query |
| vmGrantor.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| logger.info("[testLockQuery] vmGrantor tests local query"); |
| DLockService dls = (DLockService) DistributedLockService.getServiceNamed(dlsName); |
| |
| DLockRemoteToken result = dls.queryLock(key1); |
| assertThat(result).isNotNull(); |
| assertThat(result.getName()).isEqualTo(key1); |
| assertThat(result.getLeaseId() != -1).isTrue(); |
| assertThat(result.getLeaseExpireTime()).isEqualTo(MAX_VALUE); |
| RemoteThread lesseeThread = result.getLesseeThread(); |
| assertThat(lesseeThread).isNotNull(); |
| assertThat(lesseeThread.getDistributedMember()).isEqualTo(vm1Member); |
| assertThat(result.getLessee()).isEqualTo(vm1Member); |
| // nothing to test for on threadId unless we serialize info from vm1 |
| } |
| }); |
| |
| // vm2 tests positive remote dlock query |
| vm2.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| logger.info("[testLockQuery] vm2 tests remote query"); |
| connectDistributedSystem(); |
| DLockService dls = (DLockService) DistributedLockService.create(dlsName, getSystem()); |
| |
| DLockRemoteToken result = dls.queryLock(key1); |
| assertThat(result).isNotNull(); |
| assertThat(result.getName()).isEqualTo(key1); |
| assertThat(result.getLeaseId() != -1).isTrue(); |
| assertThat(result.getLeaseExpireTime()).isEqualTo(MAX_VALUE); |
| RemoteThread lesseeThread = result.getLesseeThread(); |
| assertThat(lesseeThread).isNotNull(); |
| assertThat(lesseeThread.getDistributedMember()).isEqualTo(vm1Member); |
| assertThat(result.getLessee()).isEqualTo(vm1Member); |
| // nothing to test for on threadId unless we serialize info from vm1 |
| } |
| }); |
| |
| } finally { // guarantee that testLockQuery_whileVM1Locks is notfied! |
| // vm1 sets and notifies testLockQuery_whileVM1Locks to release lock |
| vm1.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| logger.info("[testLockQuery] vm1 notifies/releases key1"); |
| synchronized (testLockQuery_whileVM1Locks) { |
| testLockQuery_whileVM1Locks.set(false); |
| testLockQuery_whileVM1Locks.notifyAll(); |
| } |
| } |
| }); |
| |
| ThreadUtils.join(whileVM1Locks, 10 * 1000); |
| if (whileVM1Locks.exceptionOccurred()) { |
| org.apache.geode.test.dunit.Assert.fail("Test failed", whileVM1Locks.getException()); |
| } |
| } |
| |
| // vmGrantor tests negative local dlock query |
| vmGrantor.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| logger.info("[testLockQuery] vmGrantor tests negative query"); |
| DLockService dls = (DLockService) DistributedLockService.getServiceNamed(dlsName); |
| |
| DLockRemoteToken result = dls.queryLock(key1); |
| assertThat(result).isNotNull(); |
| assertThat(result.getName()).isEqualTo(key1); |
| assertThat(result.getLeaseId()).isEqualTo(-1); |
| assertThat(result.getLeaseExpireTime()).isEqualTo(0); |
| assertThat(result.getLesseeThread()).isNull(); |
| assertThat(result.getLessee()).isNull(); |
| } |
| }); |
| |
| // vm2 tests negative remote dlock query |
| vm2.invoke(new SerializableRunnable() { |
| @Override |
| public void run() { |
| logger.info("[testLockQuery] vm2 tests negative query"); |
| DLockService dls = (DLockService) DistributedLockService.getServiceNamed(dlsName); |
| |
| DLockRemoteToken result = dls.queryLock(key1); |
| assertThat(result).isNotNull(); |
| assertThat(result.getName()).isEqualTo(key1); |
| assertThat(result.getLeaseId()).isEqualTo(-1); |
| assertThat(result.getLeaseExpireTime()).isEqualTo(0); |
| assertThat(result.getLesseeThread()).isNull(); |
| assertThat(result.getLessee()).isNull(); |
| } |
| }); |
| |
| } |
| |
| ////////// Support methods |
| |
| private List<VM> distributedCreateService(int numVMs, String serviceName, boolean useLocalVM) { |
| // create an entry - use scope DIST_ACK, not GLOBAL, since we're testing |
| // that explicit use of the ownership api provides the synchronization |
| |
| final List<VM> vms = new ArrayList<>(); |
| |
| if (useLocalVM) { |
| vms.add(VM.getVM(VM.getCurrentVMNum())); |
| remoteCreateService(serviceName); |
| } |
| |
| vms.addAll(forNumVMsInvoke(numVMs, () -> remoteCreateService(serviceName))); |
| // remoteCreateService(serviceName); |
| |
| return vms; |
| |
| |
| } |
| |
| /** |
| * Creates a new DistributedLockService in a remote VM. |
| * |
| * @param name The name of the newly-created DistributedLockService. It is recommended that the |
| * name of the Region be the {@link #getUniqueName()} of the test, or at least derive from |
| * it. |
| */ |
| protected static void remoteCreateService(String name) { |
| DistributedLockService newService = DistributedLockService.create(name, dlstSystem); |
| System.out.println("Created " + newService); |
| } |
| |
| private static Boolean getLockAndIncrement(String serviceName, Object objectName, long timeout, |
| long holdTime) throws Exception { |
| System.out.println("[getLockAndIncrement] In getLockAndIncrement"); |
| DistributedLockService service = DistributedLockService.getServiceNamed(serviceName); |
| boolean got = service.lock(objectName, timeout, -1); |
| System.out.println("[getLockAndIncrement] In getLockAndIncrement - got is " + got); |
| if (got) { |
| // Make sure we don't think anyone else is holding the lock |
| if (blackboard.getIsLocked()) { |
| String msg = "obtained lock on " + serviceName + "/" + objectName |
| + " but blackboard was locked, grantor=" + ((DLockService) service).getLockGrantorId() |
| + ", isGrantor=" + service.isLockGrantor(); |
| System.out.println("[getLockAndIncrement] In getLockAndIncrement: " + msg); |
| fail(msg); |
| } |
| blackboard.setIsLocked(true); |
| long count = blackboard.getCount(); |
| System.out |
| .println("[getLockAndIncrement] In getLockAndIncrement - count is " + count + " for " |
| + serviceName + "/" + objectName); |
| Thread.sleep(holdTime); |
| blackboard.incCount(); |
| blackboard.setIsLocked(false); |
| System.out |
| .println("[getLockAndIncrement] In getLockAndIncrement: " + "cleared blackboard lock for " |
| + serviceName + "/" + objectName); |
| service.unlock(objectName); |
| } |
| System.out.println("[getLockAndIncrement] Returning from getLockAndIncrement"); |
| return got; |
| } |
| |
| private synchronized void incHits() { |
| hits = hits + 1; |
| } |
| |
| private synchronized void incCompletes() { |
| completes = completes + 1; |
| } |
| |
| /** |
| * Assumes there is only one host, and invokes the given method in the first numVMs VMs that host |
| * knows about. |
| */ |
| private List<VM> forNumVMsInvoke(int numVMs, final SerializableRunnableIF runnable) { |
| List<VM> vms = new ArrayList<>(); |
| for (int i = 0; i < numVMs; i++) { |
| VM thisVm = VM.getVM(i); |
| thisVm.invoke(runnable); |
| vms.add(thisVm); |
| } |
| |
| return vms; |
| } |
| |
| public static class BasicLockClient implements Runnable { |
| private static Logger logger = LogService.getLogger(); |
| private static final Integer LOCK = 1; |
| private static final Integer UNLOCK = 2; |
| private static final Integer SUSPEND = 3; |
| private static final Integer RESUME = 4; |
| private static final Integer STOP = 5; |
| private final Object sync = new Object(); |
| private final Thread thread; |
| private final String dlsName; |
| private final String key; |
| // ordered queue of requests |
| private final LinkedList requests = new LinkedList(); |
| // map of requests to operations |
| private final Map operationsMap = new HashMap(); |
| // map of requests to throwables |
| private final Map throwables = new HashMap(); |
| private final Set completedRequests = new HashSet(); |
| private int latestRequest = 0; |
| private boolean stayinAlive = true; |
| |
| public BasicLockClient(String dlsName, String key) { |
| this.dlsName = dlsName; |
| this.key = key; |
| this.thread = new Thread(this); |
| this.thread.start(); |
| } |
| |
| @Override |
| public void run() { |
| logger.info("BasicLockClient running"); |
| while (this.stayinAlive) { |
| synchronized (this.sync) { |
| if (this.requests.size() > 0) { |
| Integer requestId = (Integer) this.requests.removeFirst(); |
| Integer operationId = (Integer) this.operationsMap.get(requestId); |
| try { |
| switch (operationId) { |
| case 1: |
| logger.info("BasicLockClient lock"); |
| assertThat(DistributedLockService.getServiceNamed(dlsName).lock(key, -1, -1)) |
| .isTrue(); |
| break; |
| case 2: |
| logger.info("BasicLockClient unlock"); |
| DistributedLockService.getServiceNamed(dlsName).unlock(key); |
| break; |
| case 3: |
| logger.info("BasicLockClient suspendLocking"); |
| assertThat(DistributedLockService.getServiceNamed(dlsName).suspendLocking(-1)) |
| .isTrue(); |
| break; |
| case 4: |
| logger.info("BasicLockClient resumeLocking"); |
| DistributedLockService.getServiceNamed(dlsName).resumeLocking(); |
| break; |
| case 5: |
| logger.info("BasicLockClient stopping"); |
| this.stayinAlive = false; |
| break; |
| } // switch |
| } // try |
| catch (VirtualMachineError e) { |
| SystemFailure.initiateFailure(e); |
| throw e; |
| } catch (Throwable t) { |
| this.throwables.put(requestId, t); |
| } finally { |
| this.completedRequests.add(requestId); |
| this.sync.notify(); |
| } |
| } |
| try { |
| this.sync.wait(); |
| } catch (InterruptedException e) { |
| logger.info("BasicLockClient interrupted"); |
| this.stayinAlive = false; |
| } |
| } // sync |
| } // while |
| } |
| |
| public void lock() throws Error { |
| doLock(LOCK); |
| } |
| |
| public void unlock() throws Error { |
| doLock(UNLOCK); |
| } |
| |
| public void suspend() throws Error { |
| doLock(SUSPEND); |
| } |
| |
| public void resume() throws Error { |
| doLock(RESUME); |
| } |
| |
| public void stop() throws Error { |
| doLock(STOP); |
| } |
| |
| private void doLock(Integer lock) { |
| try { |
| synchronized (this.sync) { |
| this.latestRequest++; |
| Integer requestId = this.latestRequest; |
| this.operationsMap.put(requestId, lock); |
| this.requests.add(requestId); |
| this.sync.notify(); |
| long maxWait = System.currentTimeMillis() + 2000; |
| while (!this.completedRequests.contains(requestId)) { |
| long waitMillis = maxWait - System.currentTimeMillis(); |
| assertThat(waitMillis > 0).isTrue(); |
| this.sync.wait(waitMillis); |
| } |
| Throwable t = (Throwable) this.throwables.get(requestId); |
| if (t != null) { |
| throw new Error(t); |
| } |
| } |
| } catch (Exception ex) { |
| throw new Error(ex); |
| } |
| } |
| } |
| } |