/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.LockSupport;

import org.apache.logging.log4j.Logger;

import org.apache.geode.GemFireException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.CacheTransactionManager;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.TransactionDataRebalancedException;
import org.apache.geode.cache.TransactionId;
import org.apache.geode.cache.TransactionInDoubtException;
import org.apache.geode.cache.TransactionListener;
import org.apache.geode.cache.TransactionWriter;
import org.apache.geode.cache.UnsupportedOperationInTransactionException;
import org.apache.geode.distributed.TXManagerCancelledException;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.SystemTimer.SystemTimerTask;
import org.apache.geode.internal.cache.entries.AbstractRegionEntry;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.Message;
import org.apache.geode.internal.statistics.StatisticsClock;
import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap;
import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap.HashEntry;
import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap.MapCallback;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.util.internal.GeodeGlossary;

/**
 * The internal implementation of the {@link CacheTransactionManager} interface returned by
 * {@link InternalCache#getCacheTransactionManager}. Internal operations
 *
 * {@code TransactionListener} invocation, Region synchronization, transaction statistics and
 *
 * transaction logging are handled here
 *
 * @since GemFire 4.0
 *
 * @see CacheTransactionManager
 */
public class TXManagerImpl implements CacheTransactionManager, MembershipListener {

  private static final Logger logger = LogService.getLogger();

  // Thread specific context container
  private final ThreadLocal<TXStateProxy> txContext;

  private final ThreadLocal<Boolean> pauseJTA;

  @MakeNotStatic
  private static TXManagerImpl currentInstance = null;

  // The unique transaction ID for this Manager
  private final AtomicInteger uniqId;

  private final DistributionManager dm;
  private final InternalCache cache;

  // The DistributionMemberID used to construct TXId's
  private final InternalDistributedMember distributionMgrId;

  private final CachePerfStats cachePerfStats;

  @Immutable
  private static final TransactionListener[] EMPTY_LISTENERS = new TransactionListener[0];

  /**
   * Default transaction id to indicate no transaction
   */
  public static final int NOTX = -1;

  private final List<TransactionListener> txListeners = new ArrayList<>(8);

  public TransactionWriter writer = null;

  private volatile boolean closed = false;

  private final Map<TXId, TXStateProxy> hostedTXStates;

  // Used for testing only.
  private final Set<TXId> scheduledToBeRemovedTx =
      Boolean.getBoolean(GeodeGlossary.GEMFIRE_PREFIX + "trackScheduledToBeRemovedTx")
          ? ConcurrentHashMap.newKeySet() : null;

  /**
   * the number of client initiated transactions to store for client failover
   */
  public static final int FAILOVER_TX_MAP_SIZE =
      Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX + "transactionFailoverMapSize", 1000);

  /**
   * used to store TXCommitMessages for client initiated transactions, so that when a client
   * failsover, (after the delegate dies) the commit message can be sent to client. //TODO we really
   * need to keep around only one msg for each thread on a client
   */
  private final Map<TXId, TXCommitMessage> failoverMap =
      Collections.synchronizedMap(new LinkedHashMap<TXId, TXCommitMessage>() {
        // TODO: inner class is serializable but outer class is not
        private static final long serialVersionUID = -4156018226167594134L;

        @Override
        protected boolean removeEldestEntry(Entry eldest) {
          if (logger.isDebugEnabled()) {
            logger.debug("TX: removing client initiated transaction from failover map:{} :{}",
                eldest.getKey(), (size() > FAILOVER_TX_MAP_SIZE));
          }
          return size() > FAILOVER_TX_MAP_SIZE;
        }
      });

  /**
   * A flag to allow persistent transactions. public for testing.
   */
  @MutableForTesting
  public static boolean ALLOW_PERSISTENT_TRANSACTIONS =
      Boolean.getBoolean(GeodeGlossary.GEMFIRE_PREFIX + "ALLOW_PERSISTENT_TRANSACTIONS");

  @MutableForTesting
  static int INITIAL_UNIQUE_ID_VALUE = 0;

  /**
   * this keeps track of all the transactions that were initiated locally.
   */
  private final ConcurrentMap<TXId, TXStateProxy> localTxMap = new ConcurrentHashMap<>();

  /**
   * the time in minutes after which any suspended transaction are rolled back. default is 30
   * minutes
   */
  private volatile long suspendedTXTimeout =
      Long.getLong(GeodeGlossary.GEMFIRE_PREFIX + "suspendedTxTimeout", 30);

  /**
   * Thread-specific flag to indicate whether the transactions managed by this
   * CacheTransactionManager for this thread should be distributed
   */
  private final ThreadLocal<Boolean> isTXDistributed;

  /**
   * The number of seconds to keep transaction states for disconnected clients. This allows the
   * client to fail over to another server and still find the transaction state to complete the
   * transaction.
   */
  private int transactionTimeToLive;

  private final StatisticsClock statisticsClock;

  /**
   * Constructor that implements the {@link CacheTransactionManager} interface. Only only one
   * instance per {@link org.apache.geode.cache.Cache}
   */
  public TXManagerImpl(CachePerfStats cachePerfStats, InternalCache cache,
      StatisticsClock statisticsClock) {
    this.cache = cache;
    dm = ((InternalDistributedSystem) cache.getDistributedSystem()).getDistributionManager();
    distributionMgrId = dm.getDistributionManagerId();
    uniqId = new AtomicInteger(INITIAL_UNIQUE_ID_VALUE);
    this.cachePerfStats = cachePerfStats;
    hostedTXStates = new HashMap<>();
    txContext = new ThreadLocal<>();
    pauseJTA = new ThreadLocal<>();
    isTXDistributed = new ThreadLocal<>();
    transactionTimeToLive = Integer
        .getInteger(GeodeGlossary.GEMFIRE_PREFIX + "cacheServer.transactionTimeToLive", 180);
    currentInstance = this;
    this.statisticsClock = statisticsClock;
  }

  public static TXManagerImpl getCurrentInstanceForTest() {
    return currentInstance;
  }

  public static void setCurrentInstanceForTest(TXManagerImpl instance) {
    currentInstance = instance;
  }

  InternalCache getCache() {
    return cache;
  }

  /**
   * Get the TransactionWriter for the cache
   *
   * @return the current TransactionWriter
   * @see TransactionWriter
   */
  @Override
  public TransactionWriter getWriter() {
    return writer;
  }

  @Override
  public void setWriter(TransactionWriter writer) {
    if (cache.isClient()) {
      throw new IllegalStateException(
          "A TransactionWriter cannot be registered on a client");
    }
    this.writer = writer;
  }

  @Override
  public TransactionListener getListener() {
    synchronized (txListeners) {
      if (txListeners.isEmpty()) {
        return null;
      } else if (txListeners.size() == 1) {
        return txListeners.get(0);
      } else {
        throw new IllegalStateException(
            "More than one transaction listener exists.");
      }
    }
  }

  @Override
  public TransactionListener[] getListeners() {
    synchronized (txListeners) {
      int size = txListeners.size();
      if (size == 0) {
        return EMPTY_LISTENERS;
      } else {
        TransactionListener[] result = new TransactionListener[size];
        txListeners.toArray(result);
        return result;
      }
    }
  }

  @Override
  public TransactionListener setListener(TransactionListener newListener) {
    synchronized (txListeners) {
      TransactionListener result = getListener();
      txListeners.clear();
      if (newListener != null) {
        txListeners.add(newListener);
      }
      if (result != null) {
        closeListener(result);
      }
      return result;
    }
  }

  @Override
  public void addListener(TransactionListener aListener) {
    if (aListener == null) {
      throw new IllegalArgumentException(
          "addListener parameter was null");
    }
    synchronized (txListeners) {
      if (!txListeners.contains(aListener)) {
        txListeners.add(aListener);
      }
    }
  }

  @Override
  public void removeListener(TransactionListener aListener) {
    if (aListener == null) {
      throw new IllegalArgumentException(
          "removeListener parameter was null");
    }
    synchronized (txListeners) {
      if (txListeners.remove(aListener)) {
        closeListener(aListener);
      }
    }
  }

  @Override
  public void initListeners(TransactionListener[] newListeners) {
    synchronized (txListeners) {
      if (!txListeners.isEmpty()) {
        for (final TransactionListener txListener : txListeners) {
          closeListener(txListener);
        }
        txListeners.clear();
      }
      if (newListeners != null && newListeners.length > 0) {
        List<TransactionListener> nl = Arrays.asList(newListeners);
        if (nl.contains(null)) {
          throw new IllegalArgumentException(
              "initListeners parameter had a null element");
        }
        txListeners.addAll(nl);
      }
    }
  }

  CachePerfStats getCachePerfStats() {
    return cachePerfStats;
  }

  /**
   * Get a new {@link TXId}.
   * This method builds a new {@link TXId} with uniqueID incremented by one.
   * Special case: If uniqueID has reached Integer.MAX_VALUE, then it is not incremented,
   * but set to one. This is done to prevent memory overflow of uniqueID to Integer.MIN_VALUE.
   */
  private TXId getNewTXId() {
    return new TXId(distributionMgrId,
        uniqId.updateAndGet(i -> i == Integer.MAX_VALUE ? 1 : i + 1));
  }

  /**
   * Build a new {@link TXId}, use it as part of the transaction state and associate with the
   * current thread using a {@link ThreadLocal}.
   */
  @Override
  public void begin() {
    checkClosed();
    {
      TransactionId tid = getTransactionId();
      if (tid != null) {
        throw new java.lang.IllegalStateException(
            String.format("Transaction %s already in progress",
                tid));
      }
    }
    {
      TXStateProxy curProxy = txContext.get();
      if (curProxy == PAUSED) {
        throw new java.lang.IllegalStateException(
            "Current thread has paused its transaction so it can not start a new transaction");
      }
    }
    TXId id = getNewTXId();
    TXStateProxyImpl proxy = null;
    if (isDistributed()) {
      proxy = new DistTXStateProxyImplOnCoordinator(cache, this, id, null, statisticsClock);
    } else {
      proxy = new TXStateProxyImpl(cache, this, id, null, statisticsClock);
    }
    setTXState(proxy);
    if (logger.isDebugEnabled()) {
      logger.debug("begin tx: {}", proxy);
    }
    localTxMap.put(id, proxy);
  }


  /**
   * Build a new {@link TXId}, use it as part of the transaction state and associate with the
   * current thread using a {@link ThreadLocal}. Flag the transaction to be enlisted with a JTA
   * Transaction. Should only be called in a context where we know there is no existing transaction.
   */
  public TXStateProxy beginJTA() {
    checkClosed();
    TXId id = getNewTXId();
    TXStateProxy newState = null;

    if (isDistributed()) {
      newState = new DistTXStateProxyImplOnCoordinator(cache, this, id, true, statisticsClock);
    } else {
      newState = new TXStateProxyImpl(cache, this, id, true, statisticsClock);
    }
    setTXState(newState);
    return newState;
  }

  /*
   * Only applicable for Distributed transaction.
   */
  public void precommit() throws CommitConflictException {
    checkClosed();

    final TXStateProxy tx = getTXState();
    if (tx == null) {
      throw new IllegalStateException(
          "Thread does not have an active transaction");
    }

    tx.checkJTA(
        "Can not commit this transaction because it is enlisted with a JTA transaction, use the JTA manager to perform the commit.");

    tx.precommit();
  }

  /**
   * Complete the transaction associated with the current thread. When this method completes, the
   * thread is no longer associated with a transaction.
   *
   */
  @Override
  public void commit() throws CommitConflictException {
    checkClosed();

    final TXStateProxy tx = getTXState();
    if (tx == null) {
      throw new IllegalStateException(
          "Thread does not have an active transaction");
    }

    tx.checkJTA(
        "Can not commit this transaction because it is enlisted with a JTA transaction, use the JTA manager to perform the commit.");

    final long opStart = statisticsClock.getTime();
    final long lifeTime = opStart - tx.getBeginTime();
    try {
      setTXState(null);
      tx.commit();
    } catch (CommitConflictException ex) {
      saveTXStateForClientFailover(tx, TXCommitMessage.CMT_CONFLICT_MSG); // fixes #43350
      noteCommitFailure(opStart, lifeTime, tx);
      cleanup(tx.getTransactionId()); // fixes #52086
      throw ex;
    } catch (TransactionDataRebalancedException reb) {
      saveTXStateForClientFailover(tx, TXCommitMessage.REBALANCE_MSG);
      cleanup(tx.getTransactionId()); // fixes #52086
      throw reb;
    } catch (UnsupportedOperationInTransactionException e) {
      // fix for #42490
      setTXState(tx);
      throw e;
    } catch (RuntimeException e) {
      saveTXStateForClientFailover(tx, TXCommitMessage.EXCEPTION_MSG);
      cleanup(tx.getTransactionId()); // fixes #52086
      throw e;
    }
    saveTXStateForClientFailover(tx);
    cleanup(tx.getTransactionId());
    noteCommitSuccess(opStart, lifeTime, tx);
  }

  void noteCommitFailure(long opStart, long lifeTime, TXStateInterface tx) {
    long opEnd = statisticsClock.getTime();
    cachePerfStats.txFailure(opEnd - opStart, lifeTime, tx.getChanges());
    TransactionListener[] listeners = getListeners();
    if (tx.isFireCallbacks() && listeners.length > 0) {
      final TXEvent e = tx.getEvent();
      try {
        for (final TransactionListener listener : listeners) {
          try {
            listener.afterFailedCommit(e);
          } catch (VirtualMachineError err) {
            SystemFailure.initiateFailure(err);
            // If this ever returns, rethrow the error. We're poisoned
            // now, so don't let this thread continue.
            throw err;
          } catch (Throwable t) {
            // Whenever you catch Error or Throwable, you must also
            // catch VirtualMachineError (see above). However, there is
            // _still_ a possibility that you are dealing with a cascading
            // error condition, so you also need to check to see if the JVM
            // is still usable:
            SystemFailure.checkFailure();
            logger.error("Exception occurred in TransactionListener", t);
          }
        }
      } finally {
        e.release();
      }
    }
  }

  void noteCommitSuccess(long opStart, long lifeTime, TXStateInterface tx) {
    long opEnd = statisticsClock.getTime();
    cachePerfStats.txSuccess(opEnd - opStart, lifeTime, tx.getChanges());
    TransactionListener[] listeners = getListeners();
    if (tx.isFireCallbacks() && listeners.length > 0) {
      final TXEvent e = tx.getEvent();
      try {
        for (final TransactionListener listener : listeners) {
          try {
            listener.afterCommit(e);
          } catch (VirtualMachineError err) {
            SystemFailure.initiateFailure(err);
            // If this ever returns, rethrow the error. We're poisoned
            // now, so don't let this thread continue.
            throw err;
          } catch (Throwable t) {
            // Whenever you catch Error or Throwable, you must also
            // catch VirtualMachineError (see above). However, there is
            // _still_ a possibility that you are dealing with a cascading
            // error condition, so you also need to check to see if the JVM
            // is still usable:
            SystemFailure.checkFailure();
            logger.error("Exception occurred in TransactionListener", t);
          }
        }
      } finally {
        e.release();
      }
    }
  }

  /**
   * Roll back the transaction associated with the current thread. When this method completes, the
   * thread is no longer associated with a transaction.
   */
  @Override
  public void rollback() {
    checkClosed();
    TXStateProxy tx = getTXState();
    if (tx == null) {
      throw new IllegalStateException(
          "Thread does not have an active transaction");
    }

    tx.checkJTA(
        "Can not rollback this transaction is enlisted with a JTA transaction, use the JTA manager to perform the rollback.");

    final long opStart = statisticsClock.getTime();
    final long lifeTime = opStart - tx.getBeginTime();
    setTXState(null);
    tx.rollback();
    saveTXStateForClientFailover(tx);
    cleanup(tx.getTransactionId());
    noteRollbackSuccess(opStart, lifeTime, tx);
  }

  void noteRollbackSuccess(long opStart, long lifeTime, TXStateInterface tx) {
    long opEnd = statisticsClock.getTime();
    cachePerfStats.txRollback(opEnd - opStart, lifeTime, tx.getChanges());
    TransactionListener[] listeners = getListeners();
    if (tx.isFireCallbacks() && listeners.length > 0) {
      final TXEvent e = tx.getEvent();
      try {
        for (final TransactionListener listener : listeners) {
          try {
            listener.afterRollback(e);
          } catch (VirtualMachineError err) {
            SystemFailure.initiateFailure(err);
            // If this ever returns, rethrow the error. We're poisoned
            // now, so don't let this thread continue.
            throw err;
          } catch (Throwable t) {
            // Whenever you catch Error or Throwable, you must also
            // catch VirtualMachineError (see above). However, there is
            // _still_ a possibility that you are dealing with a cascading
            // error condition, so you also need to check to see if the JVM
            // is still usable:
            SystemFailure.checkFailure();
            logger.error("Exception occurred in TransactionListener", t);
          }
        }
      } finally {
        e.release();
      }
    }
  }

  /**
   * Called from Commit and Rollback to unblock waiting threads
   */
  private void cleanup(TransactionId txId) {
    TXStateProxy proxy = localTxMap.remove(txId);
    if (proxy != null) {
      proxy.close();
    }
    Queue<Thread> waitingThreads = waitMap.get(txId);
    if (waitingThreads != null && !waitingThreads.isEmpty()) {
      for (Thread waitingThread : waitingThreads) {
        LockSupport.unpark(waitingThread);
      }
      waitMap.remove(txId);
    }
  }

  /**
   * Reports the existence of a Transaction for this thread
   *
   */
  @Override
  public boolean exists() {
    return null != getTXState();
  }

  /**
   * Gets the current transaction identifier or null if no transaction exists
   *
   */
  @Override
  public TransactionId getTransactionId() {
    TXStateProxy t = getTXState();
    TransactionId ret = null;
    if (t != null) {
      ret = t.getTransactionId();
    }
    return ret;
  }

  /**
   * Returns the TXStateProxyInterface of the current thread; null if no transaction.
   */
  public TXStateProxy getTXState() {
    TXStateProxy tsp = txContext.get();
    if (tsp == PAUSED) {
      // treats paused transaction as no transaction.
      return null;
    }
    if (tsp != null && !tsp.isInProgress()) {
      txContext.set(null);
      tsp = null;
    }
    return tsp;
  }

  /**
   * sets {@link TXStateProxy#setInProgress(boolean)} when a txContext is present. This method must
   * only be used in fail-over scenarios.
   *
   * @param progress value of the progress flag to be set
   * @return the previous value of inProgress flag
   * @see TXStateProxy#setInProgress(boolean)
   */
  public boolean setInProgress(boolean progress) {
    boolean retVal = false;
    TXStateProxy tsp = txContext.get();
    assert tsp != PAUSED;
    if (tsp != null) {
      retVal = tsp.isInProgress();
      tsp.setInProgress(progress);
    }
    return retVal;
  }

  public void setTXState(TXStateProxy val) {
    txContext.set(val);
  }


  public void close() {
    if (isClosed()) {
      return;
    }
    TXStateProxy[] proxies = null;
    synchronized (hostedTXStates) {
      // After this, newly added TXStateProxy would not operate on the TXState.
      closed = true;

      proxies = hostedTXStates.values().toArray(new TXStateProxy[0]);
    }

    for (TXStateProxy proxy : proxies) {
      proxy.getLock().lock();
      try {
        proxy.close();
      } finally {
        proxy.getLock().unlock();
      }
    }
    for (TXStateProxy proxy : localTxMap.values()) {
      proxy.close();
    }
    TransactionListener[] listeners = getListeners();
    for (final TransactionListener listener : listeners) {
      closeListener(listener);
    }
  }

  private void closeListener(TransactionListener tl) {
    try {
      tl.close();
    } catch (VirtualMachineError err) {
      SystemFailure.initiateFailure(err);
      // If this ever returns, rethrow the error. We're poisoned
      // now, so don't let this thread continue.
      throw err;
    } catch (Throwable t) {
      // Whenever you catch Error or Throwable, you must also
      // catch VirtualMachineError (see above). However, there is
      // _still_ a possibility that you are dealing with a cascading
      // error condition, so you also need to check to see if the JVM
      // is still usable:
      SystemFailure.checkFailure();
      logger.error("Exception occurred in TransactionListener", t);
    }
  }

  @Immutable
  private static final TXStateProxy PAUSED = new PausedTXStateProxyImpl();

  /**
   * If the current thread is in a transaction then pause will cause it to no longer be in a
   * transaction. The same thread is expected to unpause/resume the transaction later. The thread
   * should not start a new transaction after it paused a transaction.
   *
   * @return the state of the transaction or null. Pass this value to
   *         {@link TXManagerImpl#unpauseTransaction} to reactivate the puased/suspended
   *         transaction.
   */
  public TXStateProxy pauseTransaction() {
    return internalSuspend(true);
  }

  /**
   * If the current thread is in a transaction then internal suspend will cause it to no longer be
   * in a transaction. The thread can start a new transaction after it internal suspended a
   * transaction.
   *
   * @return the state of the transaction or null. to reactivate the suspended transaction.
   */
  public TXStateProxy internalSuspend() {
    return internalSuspend(false);
  }

  /**
   * If the current thread is in a transaction then suspend will cause it to no longer be in a
   * transaction.
   *
   * @param needToResumeBySameThread whether a suspended transaction needs to be resumed by the same
   *        thread.
   * @return the state of the transaction or null. Pass this value to
   *         {@link TXManagerImpl#internalResume(TXStateProxy, boolean)} to reactivate the suspended
   *         transaction.
   */
  private TXStateProxy internalSuspend(boolean needToResumeBySameThread) {
    TXStateProxy result = getTXState();
    if (result != null) {
      result.suspend();
      if (needToResumeBySameThread) {
        setTXState(PAUSED);
      } else {
        setTXState(null);
      }
    } else {
      if (needToResumeBySameThread) {
        // pausedJTA is set to true when JTA is not yet bootstrapped.
        pauseJTA.set(true);
      }
    }
    return result;
  }

  /**
   * Activates the specified transaction on the calling thread. Only the same thread that pause the
   * transaction can unpause it.
   *
   * @param tx the transaction to be unpaused.
   * @throws IllegalStateException if this thread already has an active transaction or this thread
   *         did not pause the transaction.
   */
  public void unpauseTransaction(TXStateProxy tx) {
    internalResume(tx, true);
  }

  /**
   * Activates the specified transaction on the calling thread. Does not require the same thread to
   * resume it.
   *
   * @param tx the transaction to activate.
   * @throws IllegalStateException if this thread already has an active transaction
   */
  public void internalResume(TXStateProxy tx) {
    internalResume(tx, false);
  }

  /**
   * Activates the specified transaction on the calling thread.
   *
   * @param tx the transaction to activate.
   * @param needToResumeBySameThread whether a suspended transaction needs to be resumed by the same
   *        thread.
   * @throws IllegalStateException if this thread already has an active transaction
   */
  private void internalResume(TXStateProxy tx, boolean needToResumeBySameThread) {
    if (tx != null) {
      TransactionId tid = getTransactionId();
      if (tid != null) {
        throw new java.lang.IllegalStateException(
            String.format("Transaction %s already in progress",
                tid));
      }
      if (needToResumeBySameThread) {
        TXStateProxy result = txContext.get();
        if (result != PAUSED) {
          throw new java.lang.IllegalStateException(
              "try to unpause a transaction not paused by the same thread");
        }
      }
      setTXState(tx);

      tx.resume();
    } else {
      if (needToResumeBySameThread) {
        pauseJTA.set(false);
      }
    }
  }

  public boolean isTransactionPaused() {
    return txContext.get() == PAUSED;
  }

  public boolean isJTAPaused() {
    Boolean jtaPaused = pauseJTA.get();
    if (jtaPaused == null) {
      return false;
    }
    return jtaPaused;
  }

  /**
   * @deprecated use internalResume instead
   */
  @Deprecated
  public void resume(TXStateProxy tx) {
    internalResume(tx);
  }

  public boolean isClosed() {
    return closed;
  }

  private void checkClosed() {
    cache.getCancelCriterion().checkCancelInProgress(null);
    if (closed) {
      throw new TXManagerCancelledException("This transaction manager is closed.");
    }
  }

  DistributionManager getDM() {
    return dm;
  }

  public static int getCurrentTXUniqueId() {
    if (currentInstance == null) {
      return NOTX;
    }
    return currentInstance.getMyTXUniqueId();
  }

  public static TXStateProxy getCurrentTXState() {
    if (currentInstance == null) {
      return null;
    }
    return currentInstance.getTXState();
  }

  public int getMyTXUniqueId() {
    TXStateProxy t = txContext.get();
    if (t != null && t != PAUSED) {
      return t.getTxId().getUniqId();
    } else {
      return NOTX;
    }
  }

  /**
   * Associate the remote txState with the thread processing this message. Also, we acquire a lock
   * on the txState, on which this thread operates. Some messages like SizeMessage should not create
   * a new txState.
   *
   * @return {@link TXStateProxy} the txProxy for the transactional message
   */
  public TXStateProxy masqueradeAs(TransactionMessage msg) throws InterruptedException {
    if (msg.getTXUniqId() == NOTX || !msg.canParticipateInTransaction()) {
      return null;
    }
    TXId key = new TXId(msg.getMemberToMasqueradeAs(), msg.getTXUniqId());
    TXStateProxy val = getOrSetHostedTXState(key, msg);

    if (val != null) {
      boolean success = getLock(val, key);
      while (!success) {
        val = getOrSetHostedTXState(key, msg);
        if (val != null) {
          success = getLock(val, key);
        } else {
          break;
        }
      }
    }
    if (logger.isDebugEnabled()) {
      logger.debug("masqueradeAs tx {} for msg {} ", val, msg);
    }
    setTXState(val);
    return val;
  }

  TXStateProxy getOrSetHostedTXState(TXId key, TransactionMessage msg) {
    TXStateProxy val = hostedTXStates.get(key);
    if (val == null) {
      synchronized (hostedTXStates) {
        val = hostedTXStates.get(key);
        if (val == null && msg.canStartRemoteTransaction()) {
          if (msg.isTransactionDistributed()) {
            val = new DistTXStateProxyImplOnDatanode(cache, this, key, msg.getTXOriginatorClient(),
                statisticsClock);
            val.setLocalTXState(new DistTXState(val, true, statisticsClock));
          } else {
            val = new TXStateProxyImpl(cache, this, key, msg.getTXOriginatorClient(),
                statisticsClock);
            val.setLocalTXState(new TXState(val, true, statisticsClock));
            val.setTarget(cache.getDistributedSystem().getDistributedMember());
          }
          hostedTXStates.put(key, val);
        }
      }
    }
    return val;
  }

  boolean getLock(TXStateProxy val, TXId key) {
    if (!val.getLock().isHeldByCurrentThread()) {
      val.getLock().lock();
      synchronized (hostedTXStates) {
        TXStateProxy curVal = hostedTXStates.get(key);
        // Inflight op could be received later than TXFailover operation.
        if (curVal == null) {
          if (!isHostedTxRecentlyCompleted(key)) {
            // Failover op removed the val
            // It is possible that the same operation can be executed
            // twice by two threads, but data is consistent.
            hostedTXStates.put(key, val);
          } else {
            // Another thread should complete the transaction
            logger.info("{} has already finished.", val.getTxId());
          }
        } else {
          if (val != curVal) {
            // Failover op replaced with a new TXStateProxyImpl
            // Use the new one instead.
            val.getLock().unlock();
            return false;
          }
        }
      }
    }
    return true;
  }

  /**
   * Associate the remote txState with the thread processing this message. Also, we acquire a lock
   * on the txState, on which this thread operates. Some messages like SizeMessage should not create
   * a new txState.
   *
   * @param probeOnly - do not masquerade; just look up the TX state
   * @return {@link TXStateProxy} the txProxy for the transactional message
   */
  public TXStateProxy masqueradeAs(Message msg, InternalDistributedMember memberId,
      boolean probeOnly) throws InterruptedException {
    if (msg.getTransactionId() == NOTX) {
      return null;
    }
    TXId key = new TXId(memberId, msg.getTransactionId());
    TXStateProxy val;
    val = hostedTXStates.get(key);
    if (val == null) {
      synchronized (hostedTXStates) {
        val = hostedTXStates.get(key);
        if (val == null) {
          // TODO: Conditionally create object based on distributed or non-distributed tx mode
          if (msg instanceof TransactionMessage
              && ((TransactionMessage) msg).isTransactionDistributed()) {
            val = new DistTXStateProxyImplOnDatanode(cache, this, key, memberId, statisticsClock);
            // val.setLocalTXState(new DistTXState(val,true));
          } else {
            val = new TXStateProxyImpl(cache, this, key, memberId, statisticsClock);
            // val.setLocalTXState(new TXState(val,true));
          }
          hostedTXStates.put(key, val);
        }
      }
    }
    if (!probeOnly) {
      if (val != null) {
        if (!val.getLock().isHeldByCurrentThread()) {
          val.getLock().lock();
          // add the TXStateProxy back to the map
          // in-case another thread removed it while we were waiting to lock.
          // This can happen during client transaction failover.
          synchronized (hostedTXStates) {
            hostedTXStates.put(key, val);
          }
        }
      }
      setTXState(val);
    }
    if (logger.isDebugEnabled()) {
      logger.debug("masqueradeAs tx {} for client message {}", val,
          MessageType.getString(msg.getMessageType()));
    }
    return val;
  }

  /**
   * Associate the transactional state with this thread.
   *
   * @param txState the transactional state.
   */
  public void masqueradeAs(TXStateProxy txState) {
    assert txState != null;
    if (!txState.getLock().isHeldByCurrentThread()) {
      txState.getLock().lock();
    }
    setTXState(txState);
    if (logger.isDebugEnabled()) {
      logger.debug("masqueradeAs tx {}", txState);
    }
  }

  /**
   * Remove the association created by {@link #masqueradeAs(TransactionMessage)}
   */
  public void unmasquerade(TXStateProxy tx) {
    if (tx != null) {
      if (tx.isOnBehalfOfClient()) {
        updateLastOperationTime(tx);
      }
      try {
        cleanupTransactionIfNoLongerHostCausedByFailover(tx);
      } finally {
        setTXState(null);
        tx.getLock().unlock();
      }
    }
  }

  void cleanupTransactionIfNoLongerHostCausedByFailover(TXStateProxy tx) {
    synchronized (hostedTXStates) {
      if (!hostedTXStates.containsKey(tx.getTxId())) {
        // clean up the transaction if no longer the host of the transaction
        // caused by a failover command removed the transaction.
        if (tx.isRealDealLocal() && ((TXStateProxyImpl) tx).isRemovedCausedByFailover()) {
          ((TXStateProxyImpl) tx).getLocalRealDeal().cleanup();
        }
      }
    }
  }

  void updateLastOperationTime(TXStateProxy tx) {
    ((TXStateProxyImpl) tx).setLastOperationTimeFromClient(System.currentTimeMillis());
  }

  /**
   * Cleanup the txState
   *
   * @return the TXStateProxy
   */
  public TXStateProxy removeHostedTXState(TXId txId) {
    return removeHostedTXState(txId, false);
  }

  public TXStateProxy removeHostedTXState(TXId txId, boolean causedByFailover) {
    synchronized (hostedTXStates) {
      TXStateProxy result = hostedTXStates.remove(txId);
      if (result != null) {
        result.close();
        if (causedByFailover) {
          ((TXStateProxyImpl) result).setRemovedCausedByFailover(true);
        }
      }
      return result;
    }
  }

  public void removeHostedTXState(Set<TXId> txIds) {
    for (TXId txId : txIds) {
      removeHostedTXState(txId);
    }
  }

  /**
   * Called when the CacheServer is shutdown. Removes txStates hosted on client's behalf
   */
  protected void removeHostedTXStatesForClients() {
    synchronized (hostedTXStates) {
      Iterator<Entry<TXId, TXStateProxy>> iterator = hostedTXStates.entrySet().iterator();
      while (iterator.hasNext()) {
        Entry<TXId, TXStateProxy> entry = iterator.next();
        if (entry.getValue().isOnBehalfOfClient()) {
          entry.getValue().close();
          if (logger.isDebugEnabled()) {
            logger.debug("Cleaning up TXStateProxy for {}", entry.getKey());
          }
          iterator.remove();
        }
      }
    }
  }

  /**
   * Used to verify if a transaction with a given id is hosted by this txManager.
   *
   * @return true if the transaction is in progress, false otherwise
   */
  public boolean isHostedTxInProgress(TXId txId) {
    synchronized (hostedTXStates) {
      TXStateProxy tx = hostedTXStates.get(txId);
      if (tx == null) {
        return false;
      }
      return tx.isRealDealLocal();
    }
  }

  public TXStateProxy getHostedTXState(TXId txId) {
    synchronized (hostedTXStates) {
      return hostedTXStates.get(txId);
    }
  }

  /**
   * @return number of transaction in progress on behalf of remote nodes
   */
  public int hostedTransactionsInProgressForTest() {
    synchronized (hostedTXStates) {
      return hostedTXStates.size();
    }
  }

  public int localTransactionsInProgressForTest() {
    return localTxMap.size();
  }

  @Override
  public void memberDeparted(DistributionManager distributionManager, InternalDistributedMember id,
      boolean crashed) {
    synchronized (hostedTXStates) {
      Iterator<Map.Entry<TXId, TXStateProxy>> iterator = hostedTXStates.entrySet().iterator();
      while (iterator.hasNext()) {
        Map.Entry<TXId, TXStateProxy> me = iterator.next();
        TXId txId = me.getKey();
        if (txId.getMemberId().equals(id)) {
          me.getValue().close();
          if (logger.isDebugEnabled()) {
            logger.debug("Received memberDeparted, cleaning up txState:{}", txId);
          }
          iterator.remove();
        }
      }
    }
    expireClientTransactionsSentFromDepartedProxy(id);
  }

  @Override
  public void memberJoined(DistributionManager distributionManager, InternalDistributedMember id) {}

  @Override
  public void quorumLost(DistributionManager distributionManager,
      Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {}

  @Override
  public void memberSuspect(DistributionManager distributionManager, InternalDistributedMember id,
      InternalDistributedMember whoSuspected, String reason) {}


  /**
   * retrieve the transaction TXIds for the given client
   *
   * @param id the client's membership ID
   * @return a set of the currently open TXIds
   */
  public Set<TXId> getTransactionsForClient(InternalDistributedMember id) {
    Set<TXId> result = new HashSet<>();
    synchronized (hostedTXStates) {
      for (Map.Entry<TXId, TXStateProxy> entry : hostedTXStates.entrySet()) {
        if (entry.getKey().getMemberId().equals(id)) {
          result.add(entry.getKey());
        }
      }
    }
    return result;
  }

  /**
   * retrieve the transaction states for the given client
   *
   * @param id the client's membership ID
   * @return a set of the currently open transaction states
   */
  public Set<TXStateProxy> getTransactionStatesForClient(InternalDistributedMember id) {
    Set<TXStateProxy> result = new HashSet<>();
    synchronized (hostedTXStates) {
      for (Map.Entry<TXId, TXStateProxy> entry : hostedTXStates.entrySet()) {
        if (entry.getKey().getMemberId().equals(id)) {
          result.add(entry.getValue());
        }
      }
    }
    return result;
  }

  /**
   * This method is only being invoked by pre geode 1.7.0 server during rolling upgrade now.
   * The remote server has waited for transactionTimeToLive and require this server to
   * remove the client transactions. Need to check if there is no activity of the client
   * transaction.
   */
  public void removeExpiredClientTransactions(Set<TXId> txIds) {
    if (logger.isDebugEnabled()) {
      logger.debug("expiring the following transactions: {}", txIds);
    }
    synchronized (hostedTXStates) {
      for (TXId txId : txIds) {
        // only expire client transaction if no activity for the given transactionTimeToLive
        scheduleToRemoveExpiredClientTransaction(txId);
      }
    }
  }

  @VisibleForTesting
  /* remove the given TXStates for test */
  public void removeTransactions(Set<TXId> txIds, boolean distribute) {
    synchronized (hostedTXStates) {
      Iterator<Map.Entry<TXId, TXStateProxy>> iterator = hostedTXStates.entrySet().iterator();
      while (iterator.hasNext()) {
        Map.Entry<TXId, TXStateProxy> entry = iterator.next();
        if (txIds.contains(entry.getKey())) {
          entry.getValue().close();
          iterator.remove();
        }
      }
    }
  }

  void saveTXStateForClientFailover(TXStateProxy tx) {
    if (tx.isOnBehalfOfClient() && tx.isRealDealLocal()) {
      TXCommitMessage commitMessage =
          tx.getCommitMessage() == null ? TXCommitMessage.ROLLBACK_MSG : tx.getCommitMessage();
      failoverMap.put(tx.getTxId(), commitMessage);
      if (logger.isDebugEnabled()) {
        logger.debug(
            "TX: storing client initiated transaction:{}; now there are {} entries in the failoverMap",
            tx.getTxId(), failoverMap.size());
      }
    }
  }

  private void saveTXStateForClientFailover(TXStateProxy tx, TXCommitMessage msg) {
    if (tx.isOnBehalfOfClient() && tx.isRealDealLocal()) {
      failoverMap.put(tx.getTxId(), msg);
      if (logger.isDebugEnabled()) {
        logger.debug(
            "TX: storing client initiated transaction:{}; now there are {} entries in the failoverMap",
            tx.getTxId(), failoverMap.size());
      }
    }
  }

  public void saveTXCommitMessageForClientFailover(TXId txId, TXCommitMessage msg) {
    failoverMap.put(txId, msg);
  }

  public boolean isHostedTxRecentlyCompleted(TXId txId) {
    synchronized (failoverMap) {
      if (failoverMap.containsKey(txId)) {
        // if someone is asking to see if we have the txId, they will come
        // back and ask for the commit message, this could take a long time
        // specially when called from TXFailoverCommand, so we move
        // the txId back to the linked map by removing and putting it back.
        TXCommitMessage msg = failoverMap.remove(txId);
        failoverMap.put(txId, msg);
        return true;
      }
      return false;
    }
  }


  /**
   * If the given transaction is already being completed by another thread this will wait for that
   * completion to finish and will ensure that the result is saved in the client failover map.
   *
   * @return true if a wait was performed
   */
  public boolean waitForCompletingTransaction(TXId txId) {
    TXStateProxy val;
    val = hostedTXStates.get(txId);
    if (val == null) {
      synchronized (hostedTXStates) {
        val = hostedTXStates.get(txId);
      }
    }
    if (val != null && val.isRealDealLocal()) {
      TXStateProxyImpl impl = (TXStateProxyImpl) val;
      TXState state = impl.getLocalRealDeal();
      // the thread we were waiting for would have put a TXCommitMessage
      // in the failover map, doing so here may replace an existing token
      // like TXCommitMessage.REBALANCE_MSG with null. fixes bug 42661
      // saveTXStateForClientFailover(impl);
      return state.waitForPreviousCompletion();
    }
    return false;
  }

  /**
   * Returns the TXCommitMessage for a transaction that has been successfully completed.
   *
   * @return the commit message or an exception token e.g {@link TXCommitMessage#CMT_CONFLICT_MSG}
   *         if the transaction threw an exception
   * @see #isExceptionToken(TXCommitMessage)
   */
  public TXCommitMessage getRecentlyCompletedMessage(TXId txId) {
    return failoverMap.get(txId);
  }

  /**
   * @return true if msg is an exception token, false otherwise
   */
  public boolean isExceptionToken(TXCommitMessage msg) {
    return msg == TXCommitMessage.CMT_CONFLICT_MSG || msg == TXCommitMessage.REBALANCE_MSG
        || msg == TXCommitMessage.EXCEPTION_MSG;
  }

  /**
   * Generates exception messages for the three TXCommitMessage tokens that represent exceptions
   * during transaction execution.
   *
   * @param msg the token that represents the exception
   * @return the exception
   */
  public RuntimeException getExceptionForToken(TXCommitMessage msg, TXId txId) {
    if (msg == TXCommitMessage.CMT_CONFLICT_MSG) {
      return new CommitConflictException(
          String.format("Conflict detected in GemFire transaction %s",
              txId));
    }
    if (msg == TXCommitMessage.REBALANCE_MSG) {
      return new TransactionDataRebalancedException(
          "Transactional data moved, due to rebalancing.");
    }
    if (msg == TXCommitMessage.EXCEPTION_MSG) {
      return new TransactionInDoubtException(
          "Commit failed on cache server");
    }
    throw new InternalGemFireError("the parameter TXCommitMessage is not an exception token");
  }

  /** timer task for expiring the given TXStates */
  public void expireDisconnectedClientTransactions(Set<TXId> txIds, boolean distribute) {
    // increase the client transaction timeout setting to avoid a late in-flight client operation
    // preventing the expiration of the client transaction.
    long timeout = (long) (TimeUnit.SECONDS.toMillis(getTransactionTimeToLive()) * 1.1);
    if (timeout <= 0) {
      removeHostedTXState(txIds);
    }
    synchronized (hostedTXStates) {
      for (final Entry<TXId, TXStateProxy> entry : hostedTXStates.entrySet()) {
        if (txIds.contains(entry.getKey())) {
          scheduleToRemoveClientTransaction(entry.getKey(), timeout);
        }
      }
    }
    if (distribute) {
      expireClientTransactionsOnRemoteServer(txIds);
    }
  }

  void expireClientTransactionsOnRemoteServer(Set<TXId> txIds) {
    // tell other VMs to also add tasks to expire the transactions
    ExpireDisconnectedClientTransactionsMessage.send(dm,
        dm.getOtherDistributionManagerIds(), txIds);
  }

  /**
   * expire the transaction states for the given client.
   * If the timeout is non-positive we expire the states immediately
   */
  void scheduleToRemoveClientTransaction(TXId txId, long timeout) {
    if (timeout <= 0) {
      removeHostedTXState(txId);
    } else {
      if (scheduledToBeRemovedTx != null) {
        scheduledToBeRemovedTx.add(txId);
      }
      SystemTimerTask task = new SystemTimerTask() {
        @Override
        public void run2() {
          scheduleToRemoveExpiredClientTransaction(txId);
          if (scheduledToBeRemovedTx != null) {
            scheduledToBeRemovedTx.remove(txId);
          }
        }
      };
      getCache().getCCPTimer().schedule(task, timeout);
    }
  }

  void scheduleToRemoveExpiredClientTransaction(TXId txId) {
    synchronized (hostedTXStates) {
      TXStateProxy result = hostedTXStates.get(txId);
      if (result != null) {
        if (((TXStateProxyImpl) result).isOverTransactionTimeoutLimit()) {
          result.close();
          hostedTXStates.remove(txId);
        }
      }
    }
  }

  private final ConcurrentMap<TransactionId, TXStateProxy> suspendedTXs =
      new ConcurrentHashMap<>();

  @Override
  public TransactionId suspend() {
    return suspend(TimeUnit.MINUTES);
  }

  TransactionId suspend(TimeUnit expiryTimeUnit) {
    TXStateProxy result = getTXState();
    if (result != null) {
      TransactionId txId = result.getTransactionId();
      result.suspend();
      setTXState(null);
      suspendedTXs.put(txId, result);
      // wake up waiting threads
      Queue<Thread> waitingThreads = waitMap.get(txId);
      if (waitingThreads != null) {
        Thread waitingThread = null;
        while (true) {
          waitingThread = waitingThreads.poll();
          if (waitingThread == null || !Thread.currentThread().equals(waitingThread)) {
            break;
          }
        }
        if (waitingThread != null) {
          LockSupport.unpark(waitingThread);
        }
      }
      scheduleExpiry(txId, expiryTimeUnit);
      return txId;
    }
    return null;
  }

  @Override
  public void resume(TransactionId transactionId) {
    if (transactionId == null) {
      throw new IllegalStateException(
          "Trying to resume unknown transaction, or transaction resumed by another thread");
    }
    if (getTXState() != null) {
      throw new IllegalStateException(
          "Cannot resume transaction, current thread has an active transaction");
    }
    TXStateProxy txProxy = suspendedTXs.remove(transactionId);
    if (txProxy == null) {
      throw new IllegalStateException(
          "Trying to resume unknown transaction, or transaction resumed by another thread");
    }
    resumeProxy(txProxy);
  }

  @Override
  public boolean isSuspended(TransactionId transactionId) {
    return suspendedTXs.containsKey(transactionId);
  }

  @Override
  public boolean tryResume(TransactionId transactionId) {
    if (transactionId == null || getTXState() != null) {
      return false;
    }
    TXStateProxy txProxy = suspendedTXs.remove(transactionId);
    if (txProxy != null) {
      resumeProxy(txProxy);
      return true;
    }
    return false;
  }

  private void resumeProxy(TXStateProxy txProxy) {
    assert txProxy != null;
    assert getTXState() == null;
    setTXState(txProxy);
    txProxy.resume();
    SystemTimerTask task = expiryTasks.remove(txProxy.getTransactionId());
    if (task != null) {
      if (task.cancel()) {
        cache.purgeCCPTimer();
      }
    }
  }

  /**
   * this map keeps track of all the threads that are waiting in
   * {@link #tryResume(TransactionId, long, TimeUnit)} for a particular transactionId
   */
  private final ConcurrentMap<TransactionId, Queue<Thread>> waitMap = new ConcurrentHashMap<>();

  Queue<Thread> getWaitQueue(TransactionId transactionId) {
    return waitMap.get(transactionId);
  }

  private Queue<Thread> getOrCreateWaitQueue(TransactionId transactionId) {
    Queue<Thread> threadq = getWaitQueue(transactionId);
    if (threadq == null) {
      threadq = new ConcurrentLinkedQueue<>();
      Queue<Thread> oldq = waitMap.putIfAbsent(transactionId, threadq);
      if (oldq != null) {
        threadq = oldq;
      }
    }
    return threadq;
  }

  @Override
  public boolean tryResume(TransactionId transactionId, long time, TimeUnit unit) {
    if (transactionId == null || getTXState() != null || !exists(transactionId)) {
      return false;
    }
    final Thread currentThread = Thread.currentThread();
    final long endTime = System.nanoTime() + unit.toNanos(time);
    final Queue<Thread> threadq = getOrCreateWaitQueue(transactionId);

    try {
      while (true) {
        if (!threadq.contains(currentThread)) {
          threadq.add(currentThread);
        }
        if (tryResume(transactionId)) {
          return true;
        }
        if (!exists(transactionId)) {
          return false;
        }
        long parkTimeout = endTime - System.nanoTime();
        if (parkTimeout <= 0) {
          return false;
        }
        parkToRetryResume(parkTimeout);
      }
    } finally {
      threadq.remove(currentThread);
      // the queue itself will be removed at commit/rollback
    }
  }

  void parkToRetryResume(long timeout) {
    LockSupport.parkNanos(timeout);
  }

  @Override
  public boolean exists(TransactionId transactionId) {
    return isHostedTxInProgress((TXId) transactionId) || isSuspended(transactionId)
        || localTxMap.containsKey(transactionId);
  }

  /**
   * The timeout after which any suspended transactions are rolled back if they are not resumed. If
   * a negative timeout is passed, suspended transactions will never expire.
   *
   * @param timeout the timeout in minutes
   */
  public void setSuspendedTransactionTimeout(long timeout) {
    suspendedTXTimeout = timeout;
  }

  /**
   * Return the timeout after which suspended transactions are rolled back.
   *
   * @return the timeout in minutes
   * @see #setSuspendedTransactionTimeout(long)
   */
  public long getSuspendedTransactionTimeout() {
    return suspendedTXTimeout;
  }

  /**
   * map to track the scheduled expiry tasks of suspended transactions.
   */
  private final ConcurrentMap<TransactionId, SystemTimerTask> expiryTasks =
      new ConcurrentHashMap<>();

  /**
   * schedules the transaction to expire after {@link #suspendedTXTimeout}
   *
   * @param expiryTimeUnit the time unit to use when scheduling the expiration
   */
  private void scheduleExpiry(TransactionId txId, TimeUnit expiryTimeUnit) {
    if (suspendedTXTimeout < 0) {
      if (logger.isDebugEnabled()) {
        logger.debug("TX: transaction: {} not scheduled to expire", txId);
      }
      return;
    }
    SystemTimerTask task = new TXExpiryTask(txId);
    if (logger.isDebugEnabled()) {
      logger.debug("TX: scheduling transaction: {} to expire after:{}", txId, suspendedTXTimeout);
    }
    cache.getCCPTimer().schedule(task,
        TimeUnit.MILLISECONDS.convert(suspendedTXTimeout, expiryTimeUnit));
    expiryTasks.put(txId, task);
  }

  /**
   * Task scheduled to expire a transaction when it is suspended. This task gets canceled if the
   * transaction is resumed.
   */
  public static class TXExpiryTask extends SystemTimerTask {

    /**
     * The txId to expire
     */
    private final TransactionId txId;

    public TXExpiryTask(TransactionId txId) {
      this.txId = txId;
    }

    @Override
    public void run2() {
      TXManagerImpl mgr = TXManagerImpl.currentInstance;
      TXStateProxy tx = mgr.suspendedTXs.remove(txId);
      if (tx != null) {
        try {
          if (logger.isDebugEnabled()) {
            logger.debug("TX: Expiry task rolling back transaction: {}", txId);
          }
          tx.rollback();
        } catch (GemFireException e) {
          logger.warn(String.format(
              "Exception occurred while rolling back timed out transaction %s", txId), e);
        }
      }
    }
  }
  private static class RefCountMapEntryCreator implements
      CustomEntryConcurrentHashMap.HashEntryCreator<AbstractRegionEntry, RefCountMapEntry> {
    @Override
    public HashEntry<AbstractRegionEntry, RefCountMapEntry> newEntry(AbstractRegionEntry key,
        int hash, HashEntry<AbstractRegionEntry, RefCountMapEntry> next, RefCountMapEntry value) {
      value.setNextEntry(next);
      return value;
    }

    @Override
    public int keyHashCode(Object key, boolean compareValues) {
      // key will always be an AbstractRegionEntry because our map is strongly typed.
      return ((AbstractRegionEntry) key).getEntryHash();
    }
  }
  private static class RefCountMapEntry
      implements HashEntry<AbstractRegionEntry, RefCountMapEntry> {

    private final AbstractRegionEntry key;

    private HashEntry<AbstractRegionEntry, RefCountMapEntry> next;

    private volatile int refCount;

    private static final AtomicIntegerFieldUpdater<RefCountMapEntry> refCountUpdater =
        AtomicIntegerFieldUpdater.newUpdater(RefCountMapEntry.class, "refCount");

    public RefCountMapEntry(AbstractRegionEntry k) {
      key = k;
      refCount = 1;
    }

    @Override
    public AbstractRegionEntry getKey() {
      return key;
    }

    @Override
    public boolean isKeyEqual(Object k) {
      return key.equals(k);
    }

    @Override
    public RefCountMapEntry getMapValue() {
      return this;
    }

    @Override
    public void setMapValue(RefCountMapEntry newValue) {
      if (newValue != this) {
        throw new IllegalStateException("Expected newValue " + newValue + " to be this " + this);
      }
    }

    @Override
    public int getEntryHash() {
      return key.getEntryHash();
    }

    @Override
    public HashEntry<AbstractRegionEntry, RefCountMapEntry> getNextEntry() {
      return next;
    }

    @Override
    public void setNextEntry(HashEntry<AbstractRegionEntry, RefCountMapEntry> n) {
      next = n;
    }

    public void incRefCount() {
      refCountUpdater.addAndGet(this, 1);
    }

    /**
     * Returns true if refCount goes to 0.
     */
    public boolean decRefCount() {
      int rc = refCountUpdater.decrementAndGet(this);
      if (rc < 0) {
        throw new IllegalStateException("rc=" + rc);
      }
      return rc == 0;
    }
  }

  private final CustomEntryConcurrentHashMap<AbstractRegionEntry, RefCountMapEntry> refCountMap =
      new CustomEntryConcurrentHashMap<>(
          CustomEntryConcurrentHashMap.DEFAULT_INITIAL_CAPACITY,
          CustomEntryConcurrentHashMap.DEFAULT_LOAD_FACTOR,
          CustomEntryConcurrentHashMap.DEFAULT_CONCURRENCY_LEVEL, true,
          new RefCountMapEntryCreator());

  @Immutable
  private static final MapCallback<AbstractRegionEntry, RefCountMapEntry, Object, Object> incCallback =
      new MapCallback<AbstractRegionEntry, RefCountMapEntry, Object, Object>() {
        @Override
        public RefCountMapEntry newValue(AbstractRegionEntry key, Object context,
            Object createParams) {
          return new RefCountMapEntry(key);
        }

        @Override
        public void oldValueRead(RefCountMapEntry value) {
          value.incRefCount();
        }

        @Override
        public boolean doRemoveValue(RefCountMapEntry value, Object context, Object removeParams) {
          throw new IllegalStateException("doRemoveValue should not be called from create");
        }
      };

  @Immutable
  private static final MapCallback<AbstractRegionEntry, RefCountMapEntry, Object, Object> decCallback =
      new MapCallback<AbstractRegionEntry, RefCountMapEntry, Object, Object>() {
        @Override
        public RefCountMapEntry newValue(AbstractRegionEntry key, Object context,
            Object createParams) {
          throw new IllegalStateException("newValue should not be called from remove");
        }

        @Override
        public void oldValueRead(RefCountMapEntry value) {
          throw new IllegalStateException("oldValueRead should not be called from remove");
        }

        @Override
        public boolean doRemoveValue(RefCountMapEntry value, Object context, Object removeParams) {
          return value.decRefCount();
        }
      };

  public static void incRefCount(AbstractRegionEntry re) {
    TXManagerImpl mgr = currentInstance;
    if (mgr != null) {
      mgr.refCountMap.create(re, incCallback, null, null, true);
    }
  }

  /**
   * Return true if refCount went to zero.
   */
  public static boolean decRefCount(AbstractRegionEntry re) {
    TXManagerImpl mgr = currentInstance;
    if (mgr != null) {
      return mgr.refCountMap.removeConditionally(re, decCallback, null, null) != null;
    } else {
      return true;
    }
  }

  // Used by tests
  public Set<TXId> getLocalTxIds() {
    return localTxMap.keySet();
  }

  // Used by tests
  public ArrayList<TXId> getHostedTxIds() {
    synchronized (hostedTXStates) {
      return new ArrayList<>(hostedTXStates.keySet());
    }
  }

  public void setTransactionTimeToLiveForTest(int seconds) {
    transactionTimeToLive = seconds;
  }

  /**
   * @return the time-to-live for abandoned transactions, in seconds
   */
  public int getTransactionTimeToLive() {
    return transactionTimeToLive;
  }

  public InternalDistributedMember getMemberId() {
    return distributionMgrId;
  }

  // expire the transaction states for the lost proxy server based on timeout setting.
  private void expireClientTransactionsSentFromDepartedProxy(
      InternalDistributedMember proxyServer) {
    if (cache.isClosed()) {
      return;
    }
    long timeout = getTransactionTimeToLive() * 1000L;
    if (timeout <= 0) {
      removeTransactionsSentFromDepartedProxy(proxyServer);
    } else {
      if (departedProxyServers != null) {
        departedProxyServers.add(proxyServer);
      }
      SystemTimerTask task = new SystemTimerTask() {
        @Override
        public void run2() {
          removeTransactionsSentFromDepartedProxy(proxyServer);
          if (departedProxyServers != null) {
            departedProxyServers.remove(proxyServer);
          }
        }
      };
      try {
        cache.getCCPTimer().schedule(task, timeout);
      } catch (IllegalStateException ise) {
        if (!cache.isClosed()) {
          throw ise;
        }
        // task not able to be scheduled due to cache is closing,
        // do not set it in the test hook.
        if (departedProxyServers != null) {
          departedProxyServers.remove(proxyServer);
        }
      }
    }
  }

  private final Set<InternalDistributedMember> departedProxyServers =
      Boolean.getBoolean(GeodeGlossary.GEMFIRE_PREFIX + "trackScheduledToBeRemovedTx")
          ? ConcurrentHashMap.newKeySet() : null;

  /**
   * provide a test hook to track departed peers
   */
  public Set<InternalDistributedMember> getDepartedProxyServers() {
    return departedProxyServers;
  }

  /**
   * Find all client originated transactions sent from the departed proxy server. Remove them from
   * the hostedTXStates map after the set TransactionTimeToLive period.
   *
   * @param proxyServer the departed proxy server
   */
  public void removeTransactionsSentFromDepartedProxy(InternalDistributedMember proxyServer) {
    final Set<TXId> txIds = getTransactionsSentFromDepartedProxy(proxyServer);
    if (txIds.isEmpty()) {
      return;
    }
    if (logger.isDebugEnabled()) {
      logger.debug("expiring the following transactions: {}", Arrays.toString(txIds.toArray()));
    }
    synchronized (hostedTXStates) {
      Iterator<Map.Entry<TXId, TXStateProxy>> iterator = hostedTXStates.entrySet().iterator();
      while (iterator.hasNext()) {
        Map.Entry<TXId, TXStateProxy> entry = iterator.next();
        if (txIds.contains(entry.getKey())) {
          // The TXState was not updated by any other proxy server,
          // The client would fail over to another proxy server.
          // Remove it after waiting for transactionTimeToLive period.
          entry.getValue().close();
          iterator.remove();
        }
      }
    }
  }

  /*
   * retrieve the transaction states for the given client from a certain proxy server. if
   * transactions failed over, the new proxy server information should be stored in the TXState
   *
   * @param id the proxy server
   *
   * @return a set of the currently open transaction states
   */
  private Set<TXId> getTransactionsSentFromDepartedProxy(InternalDistributedMember proxyServer) {
    Set<TXId> result = new HashSet<>();
    synchronized (hostedTXStates) {
      for (Map.Entry<TXId, TXStateProxy> entry : hostedTXStates.entrySet()) {
        TXStateProxy tx = entry.getValue();
        if (tx.isRealDealLocal() && tx.isOnBehalfOfClient()) {
          TXState txstate = (TXState) ((TXStateProxyImpl) tx).realDeal;
          if (proxyServer.equals(txstate.getProxyServer())) {
            result.add(entry.getKey());
          }
        }
      }
    }
    return result;
  }

  @Override
  public void setDistributed(boolean flag) {
    checkClosed();
    TXStateProxy tx = getTXState();
    // Check whether given flag and current flag are different and whether a transaction is in
    // progress
    if (tx != null && flag != isDistributed()) {
      // Cannot change mode in the middle of a transaction
      throw new java.lang.IllegalStateException(
          "Transaction mode cannot be changed when the thread has an active transaction");
    } else {
      isTXDistributed.set(flag);
    }
  }

  /*
   * If explicitly set using setDistributed, this returns that value. If not, it returns the value
   * of gemfire property "distributed-transactions" if set. If this is also not set, it returns the
   * default value of this property.
   */
  @Override
  public boolean isDistributed() {
    Boolean value = isTXDistributed.get();
    // This can be null if not set in setDistributed().
    if (value == null) {
      InternalDistributedSystem ids = (InternalDistributedSystem) cache.getDistributedSystem();
      return ids.getOriginalConfig().getDistributedTransactions();
    } else {
      return value;
    }
  }

  Map<TXId, TXStateProxy> getHostedTXStates() {
    return hostedTXStates;
  }

  public boolean isHostedTXStatesEmpty() {
    return hostedTXStates.isEmpty();
  }

  public Set<TXId> getScheduledToBeRemovedTx() {
    return scheduledToBeRemovedTx;
  }

  @VisibleForTesting
  public int getFailoverMapSize() {
    return failoverMap.size();
  }

}
