blob: 06da7a03c3911e2a7de4741a6d73fd61709b928d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.locks;
import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.DistributionMessageObserver;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.locks.DLockRecoverGrantorProcessor;
import org.apache.geode.distributed.internal.locks.DLockRecoverGrantorProcessor.DLockRecoverGrantorMessage;
import org.apache.geode.distributed.internal.locks.DLockService;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.TXRegionLockRequestImpl;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.DLockTest;
/**
* This class tests distributed ownership via the DistributedLockService api.
*/
@Category({DLockTest.class})
public class TXLockServiceDUnitTest extends JUnit4DistributedTestCase {
private static InternalDistributedSystem system;
protected static boolean testTXRecoverGrantor_replyCode_PASS = false;
protected static boolean testTXRecoverGrantor_heldLocks_PASS = false;
private InternalDistributedMember lockGrantor;
public TXLockServiceDUnitTest() {
super();
}
// -------------------------------------------------------------------------
// Lifecycle methods
// -------------------------------------------------------------------------
/**
* Returns a previously created (or new, if this is the first time this method is called in this
* VM) distributed system which is somewhat configurable via hydra test parameters.
*/
@Override
public final void postSetUp() throws Exception {
Invoke.invokeInEveryVM("connectDistributedSystem", () -> connectDistributedSystem());
connectDistributedSystem();
}
@Override
public final void preTearDown() throws Exception {
// invokeInEveryVM(TXLockServiceDUnitTest.class,
// "remoteDumpAllDLockServices");
Invoke.invokeInEveryVM(TXLockServiceDUnitTest.class, "destroyServices");
destroyServices();
// // Disconnects the DistributedSystem in every VM - since
// // each test randomly chooses whether shared memory is used
// disconnectAllFromDS();
this.lockGrantor = null;
testTXRecoverGrantor_replyCode_PASS = false;
testTXRecoverGrantor_heldLocks_PASS = false;
}
// -------------------------------------------------------------------------
// Test methods
// -------------------------------------------------------------------------
@Test
public void testGetAndDestroy() {
forEachVMInvoke("checkGetAndDestroy", new Object[] {});
/*
* invokeInEveryVM(TXLockServiceDUnitTest.class, "destroyServices");
* forEachVMInvoke("checkGetAndDestroy", new Object[] {});
*/
}
@Test
public void testGetAndDestroyAgain() {
testGetAndDestroy();
}
@Test
public void testTXRecoverGrantorMessageProcessor() throws Exception {
TXLockService.createDTLS(system);
checkDLockRecoverGrantorMessageProcessor();
/*
* call TXRecoverGrantorMessageProcessor.process directly to make sure that correct behavior
* occurs
*/
// get txLock and hold it
final Set participants = Collections.EMPTY_SET;
final List regionLockReqs = new ArrayList();
Map<Object, Boolean> keymap = new HashMap<Object, Boolean>();
keymap.put("KEY-1", true);
keymap.put("KEY-2", true);
keymap.put("KEY-3", true);
keymap.put("KEY-4", true);
regionLockReqs
.add(new TXRegionLockRequestImpl(SEPARATOR + "testTXRecoverGrantorMessageProcessor",
keymap));
TXLockService dtls = TXLockService.getDTLS();
TXLockId txLockId = dtls.txLock(regionLockReqs, participants);
// async call TXRecoverGrantorMessageProcessor.process
final DLockService dlock = ((TXLockServiceImpl) dtls).getInternalDistributedLockService();
final TestDLockRecoverGrantorProcessor testProc =
new TestDLockRecoverGrantorProcessor(dlock.getDistributionManager(), Collections.EMPTY_SET);
assertEquals("No valid processorId", true, testProc.getProcessorId() > -1);
final DLockRecoverGrantorProcessor.DLockRecoverGrantorMessage msg =
new DLockRecoverGrantorProcessor.DLockRecoverGrantorMessage();
msg.setServiceName(dlock.getName());
msg.setProcessorId(testProc.getProcessorId());
msg.setSender(dlock.getDistributionManager().getId());
Thread thread = new Thread(() -> {
TXRecoverGrantorMessageProcessor proc =
(TXRecoverGrantorMessageProcessor) dlock.getDLockRecoverGrantorMessageProcessor();
proc.processDLockRecoverGrantorMessage(dlock.getDistributionManager(), msg);
});
thread.setName("TXLockServiceDUnitTest thread");
thread.setDaemon(true);
thread.start();
await("waiting for recovery message to block").until(() -> {
return ((TXLockServiceImpl) dtls).isRecovering();
});
dtls.release(txLockId);
// check results to verify no locks were provided in the reply
await("waiting for thread to exit").until(() -> {
return !thread.isAlive();
});
assertFalse(((TXLockServiceImpl) dtls).isRecovering());
assertEquals("testTXRecoverGrantor_replyCode_PASS is false", true,
testTXRecoverGrantor_replyCode_PASS);
assertEquals("testTXRecoverGrantor_heldLocks_PASS is false", true,
testTXRecoverGrantor_heldLocks_PASS);
}
@Test
public void testTXGrantorMigration() throws Exception {
// first make sure some other VM is the grantor
Host.getHost(0).getVM(0).invoke("become lock grantor", () -> {
TXLockService.createDTLS(system);
TXLockService vm0dtls = TXLockService.getDTLS();
DLockService vm0dlock = ((TXLockServiceImpl) vm0dtls).getInternalDistributedLockService();
vm0dlock.becomeLockGrantor();
});
TXLockService.createDTLS(system);
checkDLockRecoverGrantorMessageProcessor();
/*
* call TXRecoverGrantorMessageProcessor.process directly to make sure that correct behavior
* occurs
*/
// get txLock and hold it
final List regionLockReqs = new ArrayList();
Map<Object, Boolean> keymap = new HashMap<Object, Boolean>();
keymap.put("KEY-1", true);
keymap.put("KEY-2", true);
keymap.put("KEY-3", true);
keymap.put("KEY-4", true);
regionLockReqs
.add(new TXRegionLockRequestImpl(SEPARATOR + "testTXRecoverGrantorMessageProcessor2",
keymap));
TXLockService dtls = TXLockService.getDTLS();
TXLockId txLockId = dtls.txLock(regionLockReqs, Collections.EMPTY_SET);
final DLockService dlock = ((TXLockServiceImpl) dtls).getInternalDistributedLockService();
// GEODE-2024: now cause grantor migration while holding the recoveryReadLock.
// It will lock up in TXRecoverGrantorMessageProcessor until the recoveryReadLock
// is released. Demonstrate that dtls.release() does not block forever and releases the
// recoveryReadLock
// allowing grantor migration to finish
// create an observer that will block recovery messages from being processed
MessageObserver observer = new MessageObserver();
DistributionMessageObserver.setInstance(observer);
try {
System.out.println("starting thread to take over being lock grantor from vm0");
// become the grantor - this will block waiting for a reply to the message blocked by the
// observer
Thread thread = new Thread(() -> {
dlock.becomeLockGrantor();
});
thread.setName("TXLockServiceDUnitTest thread2");
thread.setDaemon(true);
thread.start();
await("waiting for recovery to begin").until(() -> {
return observer.isPreventingProcessing();
});
// spawn a thread that will unblock message processing
// so that TXLockServiceImpl's "recovering" variable will be set
System.out.println("starting a thread to unblock recovery in 5 seconds");
Thread unblockThread = new Thread(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException("sleep interrupted");
}
System.out.println("releasing block of recovery message processing");
observer.releasePreventionOfProcessing();
});
unblockThread.setName("TXLockServiceDUnitTest unblockThread");
unblockThread.setDaemon(true);
unblockThread.start();
// release txLock - this will block until unblockThread tells the observer
// that it can process its message. Then it should release the recovery read-lock
// allowing the grantor to finish recovery
System.out.println("releasing transaction locks, which should block for a bit");
dtls.release(txLockId);
await("waiting for recovery to finish").until(() -> {
return !((TXLockServiceImpl) dtls).isRecovering();
});
} finally {
observer.releasePreventionOfProcessing();
DistributionMessageObserver.setInstance(null);
}
}
static class MessageObserver extends DistributionMessageObserver {
final boolean[] preventingMessageProcessing = new boolean[] {false};
final boolean[] preventMessageProcessing = new boolean[] {true};
public boolean isPreventingProcessing() {
synchronized (preventingMessageProcessing) {
return preventingMessageProcessing[0];
}
}
public void releasePreventionOfProcessing() {
synchronized (preventMessageProcessing) {
preventMessageProcessing[0] = false;
}
}
@Override
public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
if (message instanceof DLockRecoverGrantorMessage) {
synchronized (preventingMessageProcessing) {
preventingMessageProcessing[0] = true;
}
synchronized (preventMessageProcessing) {
while (preventMessageProcessing[0]) {
try {
preventMessageProcessing.wait(50);
} catch (InterruptedException e) {
throw new RuntimeException("sleep interrupted");
}
}
}
}
}
}
protected static volatile TXLockId testTXLock_TXLockId;
@Test
public void testTXLock() {
LogWriterUtils.getLogWriter().info("[testTXLock]");
final int grantorVM = 0;
final int clientA = 1;
final int clientB = 2;
final Set participants = Collections.EMPTY_SET;
final List regionLockReqs = new ArrayList();
Map<Object, Boolean> keymap1 = new HashMap<Object, Boolean>();
keymap1.put("KEY-1", true);
keymap1.put("KEY-2", true);
keymap1.put("KEY-3", true);
keymap1.put("KEY-4", true);
Map<Object, Boolean> keymap2 = new HashMap<Object, Boolean>();
keymap2.put("KEY-A", true);
keymap2.put("KEY-B", true);
keymap2.put("KEY-C", true);
keymap2.put("KEY-D", true);
regionLockReqs.add(new TXRegionLockRequestImpl(SEPARATOR + "testTXLock1", keymap1));
regionLockReqs.add(new TXRegionLockRequestImpl(SEPARATOR + "testTXLock2", keymap2));
// create grantor
LogWriterUtils.getLogWriter().info("[testTXLock] create grantor");
Host.getHost(0).getVM(grantorVM).invoke(new SerializableRunnable() {
@Override
public void run() {
TXLockService.createDTLS(system);
}
});
sleep(20);
// create client and request txLock
LogWriterUtils.getLogWriter().info("[testTXLock] create clientA and request txLock");
Host.getHost(0).getVM(clientA).invoke(new SerializableRunnable() {
@Override
public void run() {
TXLockService.createDTLS(system);
}
});
Host.getHost(0).getVM(clientA)
.invoke(new SerializableRunnable("[testTXLock] create clientA and request txLock") {
@Override
public void run() {
TXLockService dtls = TXLockService.getDTLS();
testTXLock_TXLockId = dtls.txLock(regionLockReqs, participants);
assertNotNull("testTXLock_TXLockId is null", testTXLock_TXLockId);
}
});
// create nuther client and request overlapping txLock... verify fails
LogWriterUtils.getLogWriter().info("[testTXLock] create clientB and fail txLock");
Host.getHost(0).getVM(clientB).invoke(new SerializableRunnable() {
@Override
public void run() {
TXLockService.createDTLS(system);
}
});
Host.getHost(0).getVM(clientB).invoke(new SerializableRunnable() {
@Override
public void run() {
try {
TXLockService dtls = TXLockService.getDTLS();
dtls.txLock(regionLockReqs, participants);
fail("expected CommitConflictException");
} catch (CommitConflictException expected) {
}
}
});
/*
* try { Host.getHost(0).getVM(clientB).invoke(() -> TXLockServiceDUnitTest.txLock_DTLS(
* regionLockReqs, participants )); fail("expected CommitConflictException"); } catch
* (RMIException expected) { assertTrue(expected.getCause() instanceof CommitConflictException);
* }
*/
// release txLock
LogWriterUtils.getLogWriter().info("[testTXLock] clientA releases txLock");
Host.getHost(0).getVM(clientA)
.invoke(new SerializableRunnable("[testTXLock] clientA releases txLock") {
@Override
public void run() {
TXLockService dtls = TXLockService.getDTLS();
dtls.release(testTXLock_TXLockId);
}
});
sleep(20);
// try nuther client again and verify success
LogWriterUtils.getLogWriter().info("[testTXLock] clientB requests txLock");
Host.getHost(0).getVM(clientB)
.invoke(new SerializableRunnable("[testTXLock] clientB requests txLock") {
@Override
public void run() {
TXLockService dtls = TXLockService.getDTLS();
testTXLock_TXLockId = dtls.txLock(regionLockReqs, participants);
assertNotNull("testTXLock_TXLockId is null", testTXLock_TXLockId);
}
});
// release txLock
LogWriterUtils.getLogWriter().info("[testTXLock] clientB releases txLock");
Host.getHost(0).getVM(clientB)
.invoke(new SerializableRunnable("[testTXLock] clientB releases txLock") {
@Override
public void run() {
TXLockService dtls = TXLockService.getDTLS();
dtls.release(testTXLock_TXLockId);
}
});
}
protected static volatile TXLockId testTXOriginatorRecoveryProcessor_TXLockId;
@Test
public void testTXOriginatorRecoveryProcessor() {
LogWriterUtils.getLogWriter().info("[testTXOriginatorRecoveryProcessor]");
final int originatorVM = 0;
final int grantorVM = 1;
final int particpantA = 2;
final int particpantB = 3;
final List regionLockReqs = new ArrayList();
Map<Object, Boolean> keymap1 = new HashMap<Object, Boolean>();
keymap1.put("KEY-1", true);
keymap1.put("KEY-2", true);
keymap1.put("KEY-3", true);
keymap1.put("KEY-4", true);
regionLockReqs
.add(new TXRegionLockRequestImpl(SEPARATOR + "testTXOriginatorRecoveryProcessor", keymap1));
// build participants set...
InternalDistributedMember dmId = null;
final Set participants = new HashSet();
for (int i = 1; i <= particpantB; i++) {
final int finalvm = i;
dmId = (InternalDistributedMember) Host.getHost(0).getVM(finalvm)
.invoke(() -> TXLockServiceDUnitTest.fetchDistributionManagerId());
assertEquals("dmId should not be null for vm " + finalvm, false, dmId == null);
participants.add(dmId);
}
// create grantor
LogWriterUtils.getLogWriter()
.info("[testTXOriginatorRecoveryProcessor] grantorVM becomes grantor");
Host.getHost(0).getVM(grantorVM).invoke(new SerializableRunnable() {
@Override
public void run() {
TXLockService.createDTLS(system);
}
});
Host.getHost(0).getVM(grantorVM)
.invoke(() -> TXLockServiceDUnitTest.identifyLockGrantor_DTLS());
Boolean isGrantor = (Boolean) Host.getHost(0).getVM(grantorVM)
.invoke(() -> TXLockServiceDUnitTest.isLockGrantor_DTLS());
assertEquals("isLockGrantor should not be false for DTLS", Boolean.TRUE, isGrantor);
// have a originatorVM get a txLock with three participants including grantor
LogWriterUtils.getLogWriter()
.info("[testTXOriginatorRecoveryProcessor] originatorVM requests txLock");
Host.getHost(0).getVM(originatorVM).invoke(new SerializableRunnable() {
@Override
public void run() {
TXLockService.createDTLS(system);
}
});
Host.getHost(0).getVM(originatorVM).invoke(new SerializableRunnable(
"[testTXOriginatorRecoveryProcessor] originatorVM requests txLock") {
@Override
public void run() {
TXLockService dtls = TXLockService.getDTLS();
testTXOriginatorRecoveryProcessor_TXLockId = dtls.txLock(regionLockReqs, participants);
assertNotNull("testTXOriginatorRecoveryProcessor_TXLockId is null",
testTXOriginatorRecoveryProcessor_TXLockId);
}
});
// create dtls in each participant
Host.getHost(0).getVM(particpantA).invoke(new SerializableRunnable() {
@Override
public void run() {
TXLockService.createDTLS(system);
}
});
Host.getHost(0).getVM(particpantB).invoke(new SerializableRunnable() {
@Override
public void run() {
TXLockService.createDTLS(system);
}
});
// disconnect originatorVM without releasing txLock
/*
* doesn't currently trigger the DLockLessorDepatureHandler... TODO
* Host.getHost(0).getVM(originatorVM).invoke(new SerializableRunnable() { public void run() {
* TXLockService.destroyServices(); } });
*/
/*
* Host.getHost(0).getVM(originatorVM).invoke(new SerializableRunnable() { public void run() {
* InternalDistributedSystem sys = (InternalDistributedSystem)
* InternalDistributedSystem.getAnyInstance(); if (sys != null) { sys.disconnect(); } } });
*/
Host.getHost(0).getVM(originatorVM).invoke(new SerializableRunnable() {
@Override
public void run() {
TXLockService.destroyServices();
}
});
Host.getHost(0).getVM(originatorVM).invoke(() -> disconnectFromDS());
// grantor sends TXOriginatorRecoveryMessage...
// TODO: verify processing of message? and have test sleep until finished
sleep(200);
// verify txLock is released...
Host.getHost(0).getVM(particpantA).invoke(
new SerializableRunnable("[testTXOriginatorRecoveryProcessor] verify txLock is released") {
@Override
public void run() {
TXLockService dtls = TXLockService.getDTLS();
testTXOriginatorRecoveryProcessor_TXLockId = dtls.txLock(regionLockReqs, participants);
assertNotNull("testTXOriginatorRecoveryProcessor_TXLockId is null",
testTXOriginatorRecoveryProcessor_TXLockId);
}
});
Host.getHost(0).getVM(particpantA).invoke(new SerializableRunnable(
"[testTXOriginatorRecoveryProcessor] particpantA releases txLock") {
@Override
public void run() {
TXLockService dtls = TXLockService.getDTLS();
dtls.release(testTXOriginatorRecoveryProcessor_TXLockId);
}
});
}
@Test
public void testDTLSIsDistributed() {
LogWriterUtils.getLogWriter().info("[testDTLSIsDistributed]");
// have all vms lock and hold the same LTLS lock simultaneously
final Host host = Host.getHost(0);
int vmCount = host.getVMCount();
for (int vm = 0; vm < vmCount; vm++) {
final int finalvm = vm;
LogWriterUtils.getLogWriter().info("[testDTLSIsDistributed] testing vm " + finalvm);
Host.getHost(0).getVM(finalvm).invoke(new SerializableRunnable() {
@Override
public void run() {
TXLockService.createDTLS(system);
}
});
// assert that isDistributed returns false
Boolean isDistributed =
(Boolean) host.getVM(finalvm).invoke(() -> TXLockServiceDUnitTest.isDistributed_DTLS());
assertEquals("isDistributed should be true for DTLS", Boolean.TRUE, isDistributed);
LogWriterUtils.getLogWriter().info("[testDTLSIsDistributed] isDistributed=" + isDistributed);
// lock a key...
Boolean gotLock =
(Boolean) host.getVM(finalvm).invoke(() -> TXLockServiceDUnitTest.lock_DTLS("KEY"));
assertEquals("gotLock is false after calling lock_DTLS", Boolean.TRUE, gotLock);
LogWriterUtils.getLogWriter().info("[testDTLSIsDistributed] gotLock=" + gotLock);
// unlock it...
Boolean unlock =
(Boolean) host.getVM(finalvm).invoke(() -> TXLockServiceDUnitTest.unlock_DTLS("KEY"));
assertEquals("unlock is false after calling unlock_DTLS", Boolean.TRUE, unlock);
LogWriterUtils.getLogWriter().info("[testDTLSIsDistributed] unlock=" + unlock);
}
}
// -------------------------------------------------------------------------
// Static support methods
// -------------------------------------------------------------------------
public static void remoteDumpAllDLockServices() {
DLockService.dumpAllServices();
}
/**
* Creates a new DistributedLockService in a remote VM.
*
* @param name The name of the newly-created DistributedLockService. It is recommended that the
* name of the Region be the {@link #getUniqueName()} of the test, or at least derive from
* it.
*/
protected static void remoteCreateService(String name) {
DistributedLockService newService = DistributedLockService.create(name, system);
logInfo("Created " + newService);
}
private static void logInfo(String msg) {
system.getLogWriter().info(msg);
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException ex) {
fail("interrupted");
}
}
/**
* Connects a DistributedSystem, saves it in static variable "system"
*/
private static void connectDistributedSystem() {
system = (new TXLockServiceDUnitTest()).getSystem();
}
private static InternalDistributedMember identifyLockGrantor(String serviceName) {
DLockService service = (DLockService) DistributedLockService.getServiceNamed(serviceName);
assertNotNull(service);
InternalDistributedMember grantor = service.getLockGrantorId().getLockGrantorMember();
assertNotNull(grantor);
logInfo("In identifyLockGrantor - grantor is " + grantor);
return grantor;
}
/**
* Accessed via reflection. DO NOT REMOVE
*/
protected static InternalDistributedMember identifyLockGrantor_DTLS() {
TXLockService dtls = TXLockService.getDTLS();
String serviceName = ((TXLockServiceImpl) dtls).getInternalDistributedLockService().getName();
return identifyLockGrantor(serviceName);
}
/**
* Accessed via reflection. DO NOT REMOVE
*/
protected static Boolean lock_DTLS(Object key) {
TXLockService dtls = TXLockService.getDTLS();
boolean gotLock =
((TXLockServiceImpl) dtls).getInternalDistributedLockService().lock(key, -1, -1);
logInfo("lock_LTLS gotLock (hopefully true): " + gotLock);
return Boolean.valueOf(gotLock);
}
/**
* Accessed via reflection. DO NOT REMOVE.
*/
protected static Boolean isLockGrantor_DTLS() {
TXLockService dtls = TXLockService.getDTLS();
if (true) {
DLockService service = DLockService.getInternalServiceNamed(
((TXLockServiceImpl) dtls).getInternalDistributedLockService().getName());
assertNotNull(service);
assertEquals("DTLS and DLock should both report same isLockGrantor result", true,
dtls.isLockGrantor() == service.isLockGrantor());
}
Boolean result = Boolean.valueOf(dtls.isLockGrantor());
logInfo("isLockGrantor_DTLS: " + result);
return result;
}
/**
* Accessed via reflection. DO NOT REMOVE
*/
protected static Boolean isDistributed_DTLS() {
TXLockService dtls = TXLockService.getDTLS();
boolean isDistributed =
((TXLockServiceImpl) dtls).getInternalDistributedLockService().isDistributed();
DLockService svc = ((TXLockServiceImpl) dtls).getInternalDistributedLockService();
assertNotNull(svc);
assertEquals("DTLS InternalDistributedLockService should not be destroyed", false,
svc.isDestroyed());
// sleep(50);
if (true) {
DLockService service = DLockService.getInternalServiceNamed(svc.getName());
assertNotNull(service);
assertEquals("DTLS and DLock should both report same isDistributed result", true,
isDistributed == service.isDistributed());
}
Boolean result = Boolean.valueOf(isDistributed);
logInfo("isDistributed_DTLS (hopefully true): " + result);
return result;
}
// private static void becomeLockGrantor(String serviceName) {
// InternalDistributedLockService service = (InternalDistributedLockService)
// DistributedLockService.getServiceNamed(serviceName);
// assertNotNull(service);
// logInfo("About to call becomeLockGrantor...");
// service.becomeLockGrantor();
// }
/**
* Accessed via reflection. DO NOT REMOVE
*/
protected static void checkGetAndDestroy() {
assertNull(TXLockService.getDTLS());
TXLockService dtls = TXLockService.createDTLS(system);
assertNotNull(dtls);
assertEquals(true, dtls == TXLockService.getDTLS());
assertEquals(false, dtls.isDestroyed());
TXLockService.destroyServices();
assertEquals(true, dtls.isDestroyed());
assertNull(TXLockService.getDTLS());
dtls = TXLockService.createDTLS(system);
assertNotNull(dtls);
assertEquals(true, dtls == TXLockService.getDTLS());
assertEquals(false, dtls.isDestroyed());
}
/**
* Accessed via reflection. DO NOT REMOVE
*/
protected static Boolean unlock_DTLS(Object key) {
TXLockService dtls = TXLockService.getDTLS();
try {
((TXLockServiceImpl) dtls).getInternalDistributedLockService().unlock(key);
return Boolean.TRUE;
} catch (Exception e) {
return Boolean.FALSE;
}
}
/**
* Accessed via reflection. DO NOT REMOVE.
*/
protected static InternalDistributedMember fetchDistributionManagerId() {
InternalDistributedSystem sys = InternalDistributedSystem.getAnyInstance();
if (sys != null) {
return sys.getDistributionManager().getId();
} else {
return null;
}
}
private static void destroyServices() {
TXLockService.destroyServices();
}
// -------------------------------------------------------------------------
// Non-static support methods
// -------------------------------------------------------------------------
/**
* Assumes there is only one host, and invokes the given method in the first numVMs VMs that host
* knows about.
*/
public void forNumVMsInvoke(int numVMs, String methodName, Object[] args) {
Host host = Host.getHost(0);
for (int i = 0; i < numVMs; i++) {
logInfo("Invoking " + methodName + "on VM#" + i);
host.getVM(i).invoke(this.getClass(), methodName, args);
}
}
public void forEachVMInvoke(String methodName, Object[] args) {
Host host = Host.getHost(0);
int vmCount = host.getVMCount();
for (int i = 0; i < vmCount; i++) {
LogWriterUtils.getLogWriter().info("Invoking " + methodName + "on VM#" + i);
host.getVM(i).invoke(this.getClass(), methodName, args);
}
}
@Override
public Properties getDistributedSystemProperties() {
Properties props = super.getDistributedSystemProperties();
props.setProperty(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel());
return props;
}
// private synchronized void assertGrantorIsConsistent(Serializable id) {
// if (this.lockGrantor == null) {
// this.lockGrantor = id;
// } else {
// assertIndexDetailsEquals("assertGrantorIsConsistent failed", lockGrantor, id);
// }
// }
// private void distributedCreateService(int numVMs, String serviceName) {
// forEachVMInvoke(
// "remoteCreateService",
// new Object[] { serviceName });
//
// remoteCreateService(serviceName);
// }
private void checkDLockRecoverGrantorMessageProcessor() {
/*
* simple test to make sure getDLockRecoverGrantorMessageProcessor returns instance of
* TXRecoverGrantorMessageProcessor
*/
DLockService dlock = null;
TXLockServiceImpl dtls = (TXLockServiceImpl) TXLockService.getDTLS();
assertNotNull("DTLS should not be null", dtls);
dlock = dtls.getInternalDistributedLockService();
assertEquals("DTLS should use TXRecoverGrantorMessageProcessor", true,
dlock.getDLockRecoverGrantorMessageProcessor() instanceof TXRecoverGrantorMessageProcessor);
}
// -------------------------------------------------------------------------
// Inner class
// -------------------------------------------------------------------------
private static class TestDLockRecoverGrantorProcessor extends ReplyProcessor21 {
public TestDLockRecoverGrantorProcessor(DistributionManager dm, Set members) {
super(dm.getSystem(), members);
}
@Override
protected boolean allowReplyFromSender() {
return true;
}
@Override
public void process(DistributionMessage msg) {
DLockRecoverGrantorProcessor.DLockRecoverGrantorReplyMessage reply =
(DLockRecoverGrantorProcessor.DLockRecoverGrantorReplyMessage) msg;
testTXRecoverGrantor_replyCode_PASS =
(reply.getReplyCode() == DLockRecoverGrantorProcessor.DLockRecoverGrantorReplyMessage.OK);
testTXRecoverGrantor_heldLocks_PASS = (reply.getHeldLocks().length == 0);
}
}
}