/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.locks;

import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.DistributionMessageObserver;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.locks.DLockRecoverGrantorProcessor;
import org.apache.geode.distributed.internal.locks.DLockRecoverGrantorProcessor.DLockRecoverGrantorMessage;
import org.apache.geode.distributed.internal.locks.DLockService;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.TXRegionLockRequestImpl;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.DLockTest;

/**
 * This class tests distributed ownership via the DistributedLockService api.
 */
@Category({DLockTest.class})
public class TXLockServiceDUnitTest extends JUnit4DistributedTestCase {

  private static InternalDistributedSystem system;

  protected static boolean testTXRecoverGrantor_replyCode_PASS = false;
  protected static boolean testTXRecoverGrantor_heldLocks_PASS = false;

  private InternalDistributedMember lockGrantor;

  public TXLockServiceDUnitTest() {
    super();
  }

  // -------------------------------------------------------------------------
  // Lifecycle methods
  // -------------------------------------------------------------------------

  /**
   * Returns a previously created (or new, if this is the first time this method is called in this
   * VM) distributed system which is somewhat configurable via hydra test parameters.
   */
  @Override
  public final void postSetUp() throws Exception {
    Invoke.invokeInEveryVM("connectDistributedSystem", () -> connectDistributedSystem());
    connectDistributedSystem();
  }


  @Override
  public final void preTearDown() throws Exception {
    // invokeInEveryVM(TXLockServiceDUnitTest.class,
    // "remoteDumpAllDLockServices");

    Invoke.invokeInEveryVM(TXLockServiceDUnitTest.class, "destroyServices");

    destroyServices();

    // // Disconnects the DistributedSystem in every VM - since
    // // each test randomly chooses whether shared memory is used
    // disconnectAllFromDS();

    this.lockGrantor = null;

    testTXRecoverGrantor_replyCode_PASS = false;
    testTXRecoverGrantor_heldLocks_PASS = false;
  }

  // -------------------------------------------------------------------------
  // Test methods
  // -------------------------------------------------------------------------

  @Test
  public void testGetAndDestroy() {
    forEachVMInvoke("checkGetAndDestroy", new Object[] {});
    /*
     * invokeInEveryVM(TXLockServiceDUnitTest.class, "destroyServices");
     * forEachVMInvoke("checkGetAndDestroy", new Object[] {});
     */
  }

  @Test
  public void testGetAndDestroyAgain() {
    testGetAndDestroy();
  }

  @Test
  public void testTXRecoverGrantorMessageProcessor() throws Exception {
    TXLockService.createDTLS(system);
    checkDLockRecoverGrantorMessageProcessor();

    /*
     * call TXRecoverGrantorMessageProcessor.process directly to make sure that correct behavior
     * occurs
     */

    // get txLock and hold it
    final Set participants = Collections.EMPTY_SET;
    final List regionLockReqs = new ArrayList();

    Map<Object, Boolean> keymap = new HashMap<Object, Boolean>();
    keymap.put("KEY-1", true);
    keymap.put("KEY-2", true);
    keymap.put("KEY-3", true);
    keymap.put("KEY-4", true);
    regionLockReqs
        .add(new TXRegionLockRequestImpl(SEPARATOR + "testTXRecoverGrantorMessageProcessor",
            keymap));

    TXLockService dtls = TXLockService.getDTLS();
    TXLockId txLockId = dtls.txLock(regionLockReqs, participants);

    // async call TXRecoverGrantorMessageProcessor.process
    final DLockService dlock = ((TXLockServiceImpl) dtls).getInternalDistributedLockService();
    final TestDLockRecoverGrantorProcessor testProc =
        new TestDLockRecoverGrantorProcessor(dlock.getDistributionManager(), Collections.EMPTY_SET);
    assertEquals("No valid processorId", true, testProc.getProcessorId() > -1);

    final DLockRecoverGrantorProcessor.DLockRecoverGrantorMessage msg =
        new DLockRecoverGrantorProcessor.DLockRecoverGrantorMessage();
    msg.setServiceName(dlock.getName());
    msg.setProcessorId(testProc.getProcessorId());
    msg.setSender(dlock.getDistributionManager().getId());

    Thread thread = new Thread(() -> {
      TXRecoverGrantorMessageProcessor proc =
          (TXRecoverGrantorMessageProcessor) dlock.getDLockRecoverGrantorMessageProcessor();
      proc.processDLockRecoverGrantorMessage(dlock.getDistributionManager(), msg);
    });
    thread.setName("TXLockServiceDUnitTest thread");
    thread.setDaemon(true);
    thread.start();

    await("waiting for recovery message to block").until(() -> {
      return ((TXLockServiceImpl) dtls).isRecovering();
    });

    dtls.release(txLockId);

    // check results to verify no locks were provided in the reply
    await("waiting for thread to exit").until(() -> {
      return !thread.isAlive();
    });

    assertFalse(((TXLockServiceImpl) dtls).isRecovering());

    assertEquals("testTXRecoverGrantor_replyCode_PASS is false", true,
        testTXRecoverGrantor_replyCode_PASS);
    assertEquals("testTXRecoverGrantor_heldLocks_PASS is false", true,
        testTXRecoverGrantor_heldLocks_PASS);
  }


  @Test
  public void testTXGrantorMigration() throws Exception {
    // first make sure some other VM is the grantor
    Host.getHost(0).getVM(0).invoke("become lock grantor", () -> {
      TXLockService.createDTLS(system);
      TXLockService vm0dtls = TXLockService.getDTLS();
      DLockService vm0dlock = ((TXLockServiceImpl) vm0dtls).getInternalDistributedLockService();
      vm0dlock.becomeLockGrantor();
    });

    TXLockService.createDTLS(system);
    checkDLockRecoverGrantorMessageProcessor();

    /*
     * call TXRecoverGrantorMessageProcessor.process directly to make sure that correct behavior
     * occurs
     */

    // get txLock and hold it
    final List regionLockReqs = new ArrayList();

    Map<Object, Boolean> keymap = new HashMap<Object, Boolean>();
    keymap.put("KEY-1", true);
    keymap.put("KEY-2", true);
    keymap.put("KEY-3", true);
    keymap.put("KEY-4", true);
    regionLockReqs
        .add(new TXRegionLockRequestImpl(SEPARATOR + "testTXRecoverGrantorMessageProcessor2",
            keymap));

    TXLockService dtls = TXLockService.getDTLS();
    TXLockId txLockId = dtls.txLock(regionLockReqs, Collections.EMPTY_SET);

    final DLockService dlock = ((TXLockServiceImpl) dtls).getInternalDistributedLockService();

    // GEODE-2024: now cause grantor migration while holding the recoveryReadLock.
    // It will lock up in TXRecoverGrantorMessageProcessor until the recoveryReadLock
    // is released. Demonstrate that dtls.release() does not block forever and releases the
    // recoveryReadLock
    // allowing grantor migration to finish

    // create an observer that will block recovery messages from being processed
    MessageObserver observer = new MessageObserver();
    DistributionMessageObserver.setInstance(observer);

    try {
      System.out.println("starting thread to take over being lock grantor from vm0");

      // become the grantor - this will block waiting for a reply to the message blocked by the
      // observer
      Thread thread = new Thread(() -> {
        dlock.becomeLockGrantor();
      });
      thread.setName("TXLockServiceDUnitTest thread2");
      thread.setDaemon(true);
      thread.start();

      await("waiting for recovery to begin").until(() -> {
        return observer.isPreventingProcessing();
      });


      // spawn a thread that will unblock message processing
      // so that TXLockServiceImpl's "recovering" variable will be set
      System.out.println("starting a thread to unblock recovery in 5 seconds");
      Thread unblockThread = new Thread(() -> {
        try {
          Thread.sleep(5000);
        } catch (InterruptedException e) {
          throw new RuntimeException("sleep interrupted");
        }
        System.out.println("releasing block of recovery message processing");
        observer.releasePreventionOfProcessing();
      });
      unblockThread.setName("TXLockServiceDUnitTest unblockThread");
      unblockThread.setDaemon(true);
      unblockThread.start();

      // release txLock - this will block until unblockThread tells the observer
      // that it can process its message. Then it should release the recovery read-lock
      // allowing the grantor to finish recovery
      System.out.println("releasing transaction locks, which should block for a bit");
      dtls.release(txLockId);

      await("waiting for recovery to finish").until(() -> {
        return !((TXLockServiceImpl) dtls).isRecovering();
      });
    } finally {
      observer.releasePreventionOfProcessing();
      DistributionMessageObserver.setInstance(null);
    }
  }

  static class MessageObserver extends DistributionMessageObserver {
    final boolean[] preventingMessageProcessing = new boolean[] {false};
    final boolean[] preventMessageProcessing = new boolean[] {true};


    public boolean isPreventingProcessing() {
      synchronized (preventingMessageProcessing) {
        return preventingMessageProcessing[0];
      }
    }

    public void releasePreventionOfProcessing() {
      synchronized (preventMessageProcessing) {
        preventMessageProcessing[0] = false;
      }
    }

    @Override
    public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
      if (message instanceof DLockRecoverGrantorMessage) {
        synchronized (preventingMessageProcessing) {
          preventingMessageProcessing[0] = true;
        }
        synchronized (preventMessageProcessing) {
          while (preventMessageProcessing[0]) {
            try {
              preventMessageProcessing.wait(50);
            } catch (InterruptedException e) {
              throw new RuntimeException("sleep interrupted");
            }
          }
        }
      }
    }

  }


  protected static volatile TXLockId testTXLock_TXLockId;

  @Test
  public void testTXLock() {
    LogWriterUtils.getLogWriter().info("[testTXLock]");
    final int grantorVM = 0;
    final int clientA = 1;
    final int clientB = 2;
    final Set participants = Collections.EMPTY_SET;
    final List regionLockReqs = new ArrayList();

    Map<Object, Boolean> keymap1 = new HashMap<Object, Boolean>();
    keymap1.put("KEY-1", true);
    keymap1.put("KEY-2", true);
    keymap1.put("KEY-3", true);
    keymap1.put("KEY-4", true);
    Map<Object, Boolean> keymap2 = new HashMap<Object, Boolean>();
    keymap2.put("KEY-A", true);
    keymap2.put("KEY-B", true);
    keymap2.put("KEY-C", true);
    keymap2.put("KEY-D", true);

    regionLockReqs.add(new TXRegionLockRequestImpl(SEPARATOR + "testTXLock1", keymap1));
    regionLockReqs.add(new TXRegionLockRequestImpl(SEPARATOR + "testTXLock2", keymap2));

    // create grantor
    LogWriterUtils.getLogWriter().info("[testTXLock] create grantor");

    Host.getHost(0).getVM(grantorVM).invoke(new SerializableRunnable() {
      @Override
      public void run() {
        TXLockService.createDTLS(system);
      }
    });
    sleep(20);

    // create client and request txLock
    LogWriterUtils.getLogWriter().info("[testTXLock] create clientA and request txLock");

    Host.getHost(0).getVM(clientA).invoke(new SerializableRunnable() {
      @Override
      public void run() {
        TXLockService.createDTLS(system);
      }
    });

    Host.getHost(0).getVM(clientA)
        .invoke(new SerializableRunnable("[testTXLock] create clientA and request txLock") {
          @Override
          public void run() {
            TXLockService dtls = TXLockService.getDTLS();
            testTXLock_TXLockId = dtls.txLock(regionLockReqs, participants);
            assertNotNull("testTXLock_TXLockId is null", testTXLock_TXLockId);
          }
        });

    // create nuther client and request overlapping txLock... verify fails
    LogWriterUtils.getLogWriter().info("[testTXLock] create clientB and fail txLock");

    Host.getHost(0).getVM(clientB).invoke(new SerializableRunnable() {
      @Override
      public void run() {
        TXLockService.createDTLS(system);
      }
    });

    Host.getHost(0).getVM(clientB).invoke(new SerializableRunnable() {
      @Override
      public void run() {
        try {
          TXLockService dtls = TXLockService.getDTLS();
          dtls.txLock(regionLockReqs, participants);
          fail("expected CommitConflictException");
        } catch (CommitConflictException expected) {
        }
      }
    });

    /*
     * try { Host.getHost(0).getVM(clientB).invoke(() -> TXLockServiceDUnitTest.txLock_DTLS(
     * regionLockReqs, participants )); fail("expected CommitConflictException"); } catch
     * (RMIException expected) { assertTrue(expected.getCause() instanceof CommitConflictException);
     * }
     */

    // release txLock
    LogWriterUtils.getLogWriter().info("[testTXLock] clientA releases txLock");

    Host.getHost(0).getVM(clientA)
        .invoke(new SerializableRunnable("[testTXLock] clientA releases txLock") {
          @Override
          public void run() {
            TXLockService dtls = TXLockService.getDTLS();
            dtls.release(testTXLock_TXLockId);
          }
        });
    sleep(20);

    // try nuther client again and verify success
    LogWriterUtils.getLogWriter().info("[testTXLock] clientB requests txLock");

    Host.getHost(0).getVM(clientB)
        .invoke(new SerializableRunnable("[testTXLock] clientB requests txLock") {
          @Override
          public void run() {
            TXLockService dtls = TXLockService.getDTLS();
            testTXLock_TXLockId = dtls.txLock(regionLockReqs, participants);
            assertNotNull("testTXLock_TXLockId is null", testTXLock_TXLockId);
          }
        });

    // release txLock
    LogWriterUtils.getLogWriter().info("[testTXLock] clientB releases txLock");

    Host.getHost(0).getVM(clientB)
        .invoke(new SerializableRunnable("[testTXLock] clientB releases txLock") {
          @Override
          public void run() {
            TXLockService dtls = TXLockService.getDTLS();
            dtls.release(testTXLock_TXLockId);
          }
        });
  }

  protected static volatile TXLockId testTXOriginatorRecoveryProcessor_TXLockId;

  @Test
  public void testTXOriginatorRecoveryProcessor() {
    LogWriterUtils.getLogWriter().info("[testTXOriginatorRecoveryProcessor]");
    final int originatorVM = 0;
    final int grantorVM = 1;
    final int particpantA = 2;
    final int particpantB = 3;

    final List regionLockReqs = new ArrayList();

    Map<Object, Boolean> keymap1 = new HashMap<Object, Boolean>();
    keymap1.put("KEY-1", true);
    keymap1.put("KEY-2", true);
    keymap1.put("KEY-3", true);
    keymap1.put("KEY-4", true);

    regionLockReqs
        .add(new TXRegionLockRequestImpl(SEPARATOR + "testTXOriginatorRecoveryProcessor", keymap1));

    // build participants set...
    InternalDistributedMember dmId = null;
    final Set participants = new HashSet();
    for (int i = 1; i <= particpantB; i++) {
      final int finalvm = i;
      dmId = (InternalDistributedMember) Host.getHost(0).getVM(finalvm)
          .invoke(() -> TXLockServiceDUnitTest.fetchDistributionManagerId());
      assertEquals("dmId should not be null for vm " + finalvm, false, dmId == null);
      participants.add(dmId);
    }

    // create grantor
    LogWriterUtils.getLogWriter()
        .info("[testTXOriginatorRecoveryProcessor] grantorVM becomes grantor");

    Host.getHost(0).getVM(grantorVM).invoke(new SerializableRunnable() {
      @Override
      public void run() {
        TXLockService.createDTLS(system);
      }
    });

    Host.getHost(0).getVM(grantorVM)
        .invoke(() -> TXLockServiceDUnitTest.identifyLockGrantor_DTLS());

    Boolean isGrantor = (Boolean) Host.getHost(0).getVM(grantorVM)
        .invoke(() -> TXLockServiceDUnitTest.isLockGrantor_DTLS());
    assertEquals("isLockGrantor should not be false for DTLS", Boolean.TRUE, isGrantor);

    // have a originatorVM get a txLock with three participants including grantor
    LogWriterUtils.getLogWriter()
        .info("[testTXOriginatorRecoveryProcessor] originatorVM requests txLock");

    Host.getHost(0).getVM(originatorVM).invoke(new SerializableRunnable() {
      @Override
      public void run() {
        TXLockService.createDTLS(system);
      }
    });
    Host.getHost(0).getVM(originatorVM).invoke(new SerializableRunnable(
        "[testTXOriginatorRecoveryProcessor] originatorVM requests txLock") {
      @Override
      public void run() {
        TXLockService dtls = TXLockService.getDTLS();
        testTXOriginatorRecoveryProcessor_TXLockId = dtls.txLock(regionLockReqs, participants);
        assertNotNull("testTXOriginatorRecoveryProcessor_TXLockId is null",
            testTXOriginatorRecoveryProcessor_TXLockId);
      }
    });

    // create dtls in each participant
    Host.getHost(0).getVM(particpantA).invoke(new SerializableRunnable() {
      @Override
      public void run() {
        TXLockService.createDTLS(system);
      }
    });
    Host.getHost(0).getVM(particpantB).invoke(new SerializableRunnable() {
      @Override
      public void run() {
        TXLockService.createDTLS(system);
      }
    });

    // disconnect originatorVM without releasing txLock
    /*
     * doesn't currently trigger the DLockLessorDepatureHandler... TODO
     * Host.getHost(0).getVM(originatorVM).invoke(new SerializableRunnable() { public void run() {
     * TXLockService.destroyServices(); } });
     */

    /*
     * Host.getHost(0).getVM(originatorVM).invoke(new SerializableRunnable() { public void run() {
     * InternalDistributedSystem sys = (InternalDistributedSystem)
     * InternalDistributedSystem.getAnyInstance(); if (sys != null) { sys.disconnect(); } } });
     */

    Host.getHost(0).getVM(originatorVM).invoke(new SerializableRunnable() {
      @Override
      public void run() {
        TXLockService.destroyServices();
      }
    });
    Host.getHost(0).getVM(originatorVM).invoke(() -> disconnectFromDS());

    // grantor sends TXOriginatorRecoveryMessage...
    // TODO: verify processing of message? and have test sleep until finished
    sleep(200);

    // verify txLock is released...
    Host.getHost(0).getVM(particpantA).invoke(
        new SerializableRunnable("[testTXOriginatorRecoveryProcessor] verify txLock is released") {
          @Override
          public void run() {
            TXLockService dtls = TXLockService.getDTLS();
            testTXOriginatorRecoveryProcessor_TXLockId = dtls.txLock(regionLockReqs, participants);
            assertNotNull("testTXOriginatorRecoveryProcessor_TXLockId is null",
                testTXOriginatorRecoveryProcessor_TXLockId);
          }
        });

    Host.getHost(0).getVM(particpantA).invoke(new SerializableRunnable(
        "[testTXOriginatorRecoveryProcessor] particpantA releases txLock") {
      @Override
      public void run() {
        TXLockService dtls = TXLockService.getDTLS();
        dtls.release(testTXOriginatorRecoveryProcessor_TXLockId);
      }
    });
  }

  @Test
  public void testDTLSIsDistributed() {
    LogWriterUtils.getLogWriter().info("[testDTLSIsDistributed]");

    // have all vms lock and hold the same LTLS lock simultaneously
    final Host host = Host.getHost(0);
    int vmCount = host.getVMCount();
    for (int vm = 0; vm < vmCount; vm++) {
      final int finalvm = vm;
      LogWriterUtils.getLogWriter().info("[testDTLSIsDistributed] testing vm " + finalvm);

      Host.getHost(0).getVM(finalvm).invoke(new SerializableRunnable() {
        @Override
        public void run() {
          TXLockService.createDTLS(system);
        }
      });

      // assert that isDistributed returns false
      Boolean isDistributed =
          (Boolean) host.getVM(finalvm).invoke(() -> TXLockServiceDUnitTest.isDistributed_DTLS());
      assertEquals("isDistributed should be true for DTLS", Boolean.TRUE, isDistributed);
      LogWriterUtils.getLogWriter().info("[testDTLSIsDistributed] isDistributed=" + isDistributed);

      // lock a key...
      Boolean gotLock =
          (Boolean) host.getVM(finalvm).invoke(() -> TXLockServiceDUnitTest.lock_DTLS("KEY"));
      assertEquals("gotLock is false after calling lock_DTLS", Boolean.TRUE, gotLock);
      LogWriterUtils.getLogWriter().info("[testDTLSIsDistributed] gotLock=" + gotLock);

      // unlock it...
      Boolean unlock =
          (Boolean) host.getVM(finalvm).invoke(() -> TXLockServiceDUnitTest.unlock_DTLS("KEY"));
      assertEquals("unlock is false after calling unlock_DTLS", Boolean.TRUE, unlock);
      LogWriterUtils.getLogWriter().info("[testDTLSIsDistributed] unlock=" + unlock);
    }
  }

  // -------------------------------------------------------------------------
  // Static support methods
  // -------------------------------------------------------------------------

  public static void remoteDumpAllDLockServices() {
    DLockService.dumpAllServices();
  }

  /**
   * Creates a new DistributedLockService in a remote VM.
   *
   * @param name The name of the newly-created DistributedLockService. It is recommended that the
   *        name of the Region be the {@link #getUniqueName()} of the test, or at least derive from
   *        it.
   */
  protected static void remoteCreateService(String name) {
    DistributedLockService newService = DistributedLockService.create(name, system);
    logInfo("Created " + newService);
  }

  private static void logInfo(String msg) {
    system.getLogWriter().info(msg);
  }

  private static void sleep(long millis) {
    try {
      Thread.sleep(millis);
    } catch (InterruptedException ex) {
      fail("interrupted");
    }
  }

  /**
   * Connects a DistributedSystem, saves it in static variable "system"
   */
  private static void connectDistributedSystem() {
    system = (new TXLockServiceDUnitTest()).getSystem();
  }

  private static InternalDistributedMember identifyLockGrantor(String serviceName) {
    DLockService service = (DLockService) DistributedLockService.getServiceNamed(serviceName);
    assertNotNull(service);

    InternalDistributedMember grantor = service.getLockGrantorId().getLockGrantorMember();
    assertNotNull(grantor);
    logInfo("In identifyLockGrantor - grantor is " + grantor);
    return grantor;
  }

  /**
   * Accessed via reflection. DO NOT REMOVE
   */
  protected static InternalDistributedMember identifyLockGrantor_DTLS() {
    TXLockService dtls = TXLockService.getDTLS();
    String serviceName = ((TXLockServiceImpl) dtls).getInternalDistributedLockService().getName();
    return identifyLockGrantor(serviceName);
  }

  /**
   * Accessed via reflection. DO NOT REMOVE
   */
  protected static Boolean lock_DTLS(Object key) {
    TXLockService dtls = TXLockService.getDTLS();
    boolean gotLock =
        ((TXLockServiceImpl) dtls).getInternalDistributedLockService().lock(key, -1, -1);
    logInfo("lock_LTLS gotLock (hopefully true): " + gotLock);
    return Boolean.valueOf(gotLock);
  }

  /**
   * Accessed via reflection. DO NOT REMOVE.
   */
  protected static Boolean isLockGrantor_DTLS() {
    TXLockService dtls = TXLockService.getDTLS();

    if (true) {
      DLockService service = DLockService.getInternalServiceNamed(
          ((TXLockServiceImpl) dtls).getInternalDistributedLockService().getName());
      assertNotNull(service);

      assertEquals("DTLS and DLock should both report same isLockGrantor result", true,
          dtls.isLockGrantor() == service.isLockGrantor());
    }

    Boolean result = Boolean.valueOf(dtls.isLockGrantor());
    logInfo("isLockGrantor_DTLS: " + result);
    return result;
  }

  /**
   * Accessed via reflection. DO NOT REMOVE
   */
  protected static Boolean isDistributed_DTLS() {
    TXLockService dtls = TXLockService.getDTLS();
    boolean isDistributed =
        ((TXLockServiceImpl) dtls).getInternalDistributedLockService().isDistributed();

    DLockService svc = ((TXLockServiceImpl) dtls).getInternalDistributedLockService();
    assertNotNull(svc);
    assertEquals("DTLS InternalDistributedLockService should not be destroyed", false,
        svc.isDestroyed());

    // sleep(50);

    if (true) {
      DLockService service = DLockService.getInternalServiceNamed(svc.getName());
      assertNotNull(service);

      assertEquals("DTLS and DLock should both report same isDistributed result", true,
          isDistributed == service.isDistributed());
    }

    Boolean result = Boolean.valueOf(isDistributed);
    logInfo("isDistributed_DTLS (hopefully true): " + result);
    return result;
  }

  // private static void becomeLockGrantor(String serviceName) {
  // InternalDistributedLockService service = (InternalDistributedLockService)
  // DistributedLockService.getServiceNamed(serviceName);
  // assertNotNull(service);
  // logInfo("About to call becomeLockGrantor...");
  // service.becomeLockGrantor();
  // }

  /**
   * Accessed via reflection. DO NOT REMOVE
   */
  protected static void checkGetAndDestroy() {
    assertNull(TXLockService.getDTLS());

    TXLockService dtls = TXLockService.createDTLS(system);
    assertNotNull(dtls);
    assertEquals(true, dtls == TXLockService.getDTLS());
    assertEquals(false, dtls.isDestroyed());

    TXLockService.destroyServices();
    assertEquals(true, dtls.isDestroyed());
    assertNull(TXLockService.getDTLS());

    dtls = TXLockService.createDTLS(system);
    assertNotNull(dtls);
    assertEquals(true, dtls == TXLockService.getDTLS());
    assertEquals(false, dtls.isDestroyed());
  }

  /**
   * Accessed via reflection. DO NOT REMOVE
   */
  protected static Boolean unlock_DTLS(Object key) {
    TXLockService dtls = TXLockService.getDTLS();
    try {
      ((TXLockServiceImpl) dtls).getInternalDistributedLockService().unlock(key);
      return Boolean.TRUE;
    } catch (Exception e) {
      return Boolean.FALSE;
    }
  }


  /**
   * Accessed via reflection. DO NOT REMOVE.
   */
  protected static InternalDistributedMember fetchDistributionManagerId() {
    InternalDistributedSystem sys = InternalDistributedSystem.getAnyInstance();
    if (sys != null) {
      return sys.getDistributionManager().getId();
    } else {
      return null;
    }
  }

  private static void destroyServices() {
    TXLockService.destroyServices();
  }

  // -------------------------------------------------------------------------
  // Non-static support methods
  // -------------------------------------------------------------------------

  /**
   * Assumes there is only one host, and invokes the given method in the first numVMs VMs that host
   * knows about.
   */
  public void forNumVMsInvoke(int numVMs, String methodName, Object[] args) {
    Host host = Host.getHost(0);
    for (int i = 0; i < numVMs; i++) {
      logInfo("Invoking " + methodName + "on VM#" + i);
      host.getVM(i).invoke(this.getClass(), methodName, args);
    }
  }

  public void forEachVMInvoke(String methodName, Object[] args) {
    Host host = Host.getHost(0);
    int vmCount = host.getVMCount();
    for (int i = 0; i < vmCount; i++) {
      LogWriterUtils.getLogWriter().info("Invoking " + methodName + "on VM#" + i);
      host.getVM(i).invoke(this.getClass(), methodName, args);
    }
  }

  @Override
  public Properties getDistributedSystemProperties() {
    Properties props = super.getDistributedSystemProperties();
    props.setProperty(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel());
    return props;
  }

  // private synchronized void assertGrantorIsConsistent(Serializable id) {
  // if (this.lockGrantor == null) {
  // this.lockGrantor = id;
  // } else {
  // assertIndexDetailsEquals("assertGrantorIsConsistent failed", lockGrantor, id);
  // }
  // }

  // private void distributedCreateService(int numVMs, String serviceName) {
  // forEachVMInvoke(
  // "remoteCreateService",
  // new Object[] { serviceName });
  //
  // remoteCreateService(serviceName);
  // }

  private void checkDLockRecoverGrantorMessageProcessor() {
    /*
     * simple test to make sure getDLockRecoverGrantorMessageProcessor returns instance of
     * TXRecoverGrantorMessageProcessor
     */
    DLockService dlock = null;

    TXLockServiceImpl dtls = (TXLockServiceImpl) TXLockService.getDTLS();
    assertNotNull("DTLS should not be null", dtls);
    dlock = dtls.getInternalDistributedLockService();
    assertEquals("DTLS should use TXRecoverGrantorMessageProcessor", true,
        dlock.getDLockRecoverGrantorMessageProcessor() instanceof TXRecoverGrantorMessageProcessor);
  }

  // -------------------------------------------------------------------------
  // Inner class
  // -------------------------------------------------------------------------

  private static class TestDLockRecoverGrantorProcessor extends ReplyProcessor21 {
    public TestDLockRecoverGrantorProcessor(DistributionManager dm, Set members) {
      super(dm.getSystem(), members);
    }

    @Override
    protected boolean allowReplyFromSender() {
      return true;
    }

    @Override
    public void process(DistributionMessage msg) {
      DLockRecoverGrantorProcessor.DLockRecoverGrantorReplyMessage reply =
          (DLockRecoverGrantorProcessor.DLockRecoverGrantorReplyMessage) msg;
      testTXRecoverGrantor_replyCode_PASS =
          (reply.getReplyCode() == DLockRecoverGrantorProcessor.DLockRecoverGrantorReplyMessage.OK);
      testTXRecoverGrantor_heldLocks_PASS = (reply.getHeldLocks().length == 0);
    }
  }

}
