/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.geode.internal.cache;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.logging.log4j.Logger;

import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.locks.TXLockId;
import org.apache.geode.logging.internal.log4j.api.LogService;

/**
 * TXFarSideCMTracker tracks received and processed TXCommitMessages, for transactions that contain
 * changes for DACK regions. Its main purpose is to allow recovery in the event that the VM which
 * orinated the TXCommitMessage exits the DistributedSystem. Tracking is done by using TXLockIds or
 * TXIds. It is designed for these failure cases:
 *
 * <ol>
 *
 * <li>The TX Originator has died during sending the second message which one or more of the
 * recipients (aka Far Siders) missed. To help in this case, each of the Far Siders will broadcast a
 * message to determine if the second commit message was received.</li>
 *
 * <li>The TX Grantor (the reservation system) has noticed that the TX Originator has died and
 * queries each of the Far Siders to determine if the reservation (aka <code>TXLockId</code>) given
 * to the TX Originator is no longer needed (the transaction has been processed)</li>
 *
 * <li>The TX Grantor has died and a new one is considering granting new reservations, but before
 * doing so must query all of the members to know if all the previous granted reservations (aka
 * <code>TXLockId</code>s are no longer needed (the transactions have been processed)</li>
 *
 * </ol>
 *
 * @since GemFire 4.0
 *
 */
public class TXFarSideCMTracker {
  private static final Logger logger = LogService.getLogger();

  private final Map txInProgress;
  private final Object[] txHistory;
  private int lastHistoryItem;

  /**
   * Constructor for TXFarSideCMTracker
   *
   * @param historySize The number of processed transactions to remember in the event that fellow
   *        Far Siders did not receive the second message.
   */
  public TXFarSideCMTracker(int historySize) {
    txInProgress = new HashMap();
    txHistory = new Object[historySize];
    lastHistoryItem = 0;
  }

  public int getHistorySize() {
    return txHistory.length;
  }

  /**
   * Answers fellow "Far Siders" question about an DACK transaction when the transaction originator
   * died before it sent the CommitProcess message.
   */
  public boolean commitProcessReceived(Object key) {
    final TXCommitMessage message;
    synchronized (txInProgress) {
      message = (TXCommitMessage) getTxInProgress().get(key);
      if (foundTxInProgress(message)) {
        return true;
      }

      if (foundFromHistory(key)) {
        return true;
      }
    }

    if (message != null) {
      synchronized (message) {
        if (!message.isProcessing()) {
          // Prevent any potential future processing
          // of this message
          message.setDontProcess();
          return false;
        } else {
          return true;
        }
      }
    }

    return false;
  }

  Map getTxInProgress() {
    return txInProgress;
  }

  boolean foundTxInProgress(TXCommitMessage message) {
    return null != message && message.isProcessing();
  }

  boolean foundFromHistory(Object key) {
    for (int i = txHistory.length - 1; i >= 0; --i) {
      if (key.equals(txHistory[i])) {
        return true;
      }
    }
    return false;
  }

  /**
   * Answers new Grantor query regarding whether it can start handing out new locks. Waits until
   * txInProgress is empty.
   */
  public void waitForAllToProcess() throws InterruptedException {
    if (Thread.interrupted()) {
      throw new InterruptedException(); // wisest to do this before the synchronize below
    }
    // Assume that a thread interrupt is only set in the
    // case of a shutdown, in that case we don't need to wait
    // around any longer, propagating the interrupt is reasonable behavior
    boolean messageWritten = false;
    synchronized (txInProgress) {
      while (!txInProgress.isEmpty()) {
        logger.info("Lock grantor recovery is waiting for transactions to complete: {}",
            txInProgress);
        messageWritten = true;
        txInProgress.wait();
      }
    }
    if (messageWritten) {
      logger.info("Wait for transactions completed");
    }
  }

  /**
   * Answers existing Grantor's question about the status of a reservation/lock given to a
   * departed/ing Originator (this will most likely be called nearly the same time as
   * commitProcessReceived
   */
  public void waitToProcess(TXLockId lockId, DistributionManager dm) {
    waitForMemberToDepart(lockId.getMemberId(), dm);
    final TXCommitMessage commitMessage;
    synchronized (txInProgress) {
      commitMessage = (TXCommitMessage) txInProgress.get(lockId);
    }
    if (commitMessage != null) {
      synchronized (commitMessage) {
        // tx in progress, we must wait until its done
        while (!(commitMessage.wasProcessed() || commitMessage.isDepartureNoticed())) {
          try {
            commitMessage.wait(100);
          } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            logger.error(
                String.format("Waiting to complete on message %s caught an interrupted exception",
                    commitMessage),
                ie);
            break;
          }
        }
      }
    } else {
      // tx may have completed
      for (int i = txHistory.length - 1; i >= 0; --i) {
        if (lockId.equals(txHistory[i])) {
          return;
        }
      }
    }
  }

  /**
   * Register a <code>MemberhipListener</code>, wait until the member is gone.
   */
  private void waitForMemberToDepart(final InternalDistributedMember memberId,
      DistributionManager dm) {
    if (!dm.getDistributionManagerIds().contains(memberId)) {
      return;
    }

    final Object lock = new Object();
    final MembershipListener memEar = new MembershipListener() {
      // MembershipListener implementation
      @Override
      public void memberJoined(DistributionManager distributionManager,
          InternalDistributedMember id) {}

      @Override
      public void memberSuspect(DistributionManager distributionManager,
          InternalDistributedMember id, InternalDistributedMember whoSuspected, String reason) {}

      @Override
      public void memberDeparted(DistributionManager distributionManager,
          InternalDistributedMember id, boolean crashed) {
        if (memberId.equals(id)) {
          synchronized (lock) {
            lock.notifyAll();
          }
        }
      }

      @Override
      public void quorumLost(DistributionManager distributionManager,
          Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {}
    };
    try {
      Set memberSet = dm.addMembershipListenerAndGetDistributionManagerIds(memEar);

      // Still need to wait
      synchronized (lock) {
        while (memberSet.contains(memberId)) {
          try {
            lock.wait();
            memberSet = dm.getDistributionManagerIds();
          } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            // TODO Log an error here
            return;
          }
        }
      } // synchronized memberId
    } finally {
      // Its gone, we can proceed
      dm.removeMembershipListener(memEar);
    }
  }

  /**
   * Indicate that the transaction message has been processed and to place it in the transaction
   * history
   */
  public TXCommitMessage processed(TXCommitMessage processedMess) {
    final TXCommitMessage mess;
    final Object key = processedMess.getTrackerKey();
    synchronized (txInProgress) {
      mess = (TXCommitMessage) txInProgress.remove(key);
      if (mess != null) {
        txHistory[lastHistoryItem++] = key;
        if (lastHistoryItem >= txHistory.length) {
          lastHistoryItem = 0;
        }
        // For any waitForAllToComplete
        if (txInProgress.isEmpty()) {
          txInProgress.notifyAll();
        }
      }
    }
    if (mess != null) {
      synchronized (mess) {
        mess.setProcessed(true);
        // For any waitToComplete
        mess.notifyAll();
      }
    }
    return mess;
  }

  /**
   * Indicate that this message is never going to be processed, typically used in the case where
   * none of the FarSiders received the CommitProcessMessage
   **/
  public void removeMessage(TXCommitMessage deadMess) {
    synchronized (txInProgress) {
      txInProgress.remove(deadMess.getTrackerKey());
      // For any waitForAllToComplete
      if (txInProgress.isEmpty()) {
        txInProgress.notifyAll();
      }
    }
  }

  /**
   * Retrieve the commit message associated with the lock
   */
  public TXCommitMessage get(Object key) {
    final TXCommitMessage mess;
    synchronized (txInProgress) {
      mess = (TXCommitMessage) txInProgress.get(key);
    }
    return mess;
  }

  public TXCommitMessage waitForMessage(Object key, DistributionManager dm) {
    TXCommitMessage msg = null;
    synchronized (txInProgress) {
      msg = (TXCommitMessage) txInProgress.get(key);
      while (msg == null) {
        try {
          dm.getSystem().getCancelCriterion().checkCancelInProgress(null);
          txInProgress.wait();
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        }
        msg = (TXCommitMessage) txInProgress.get(key);
      }
    }
    return msg;
  }

  /**
   * The transcation commit message has been received
   */
  public void add(TXCommitMessage msg) {
    synchronized (txInProgress) {
      final Object key = msg.getTrackerKey();
      if (key == null) {
        Assert.assertTrue(false, "TXFarSideCMTracker must have a non-null key for message " + msg);
      }
      txInProgress.put(key, msg);
      txInProgress.notifyAll();
    }
  }

  // TODO we really need to keep around only one msg for each thread on a client
  private final Map<TXId, TXCommitMessage> failoverMap =
      Collections.synchronizedMap(new LinkedHashMap<TXId, TXCommitMessage>() {
        @Override
        protected boolean removeEldestEntry(Entry eldest) {
          return size() > TXManagerImpl.FAILOVER_TX_MAP_SIZE;
        }
      });

  public void saveTXForClientFailover(TXId txId, TXCommitMessage msg) {
    failoverMap.put(txId, msg);
  }

  public TXCommitMessage getTXCommitMessage(TXId txId) {
    return failoverMap.get(txId);
  }

  /**
   * a static TXFarSideCMTracker is held by TXCommitMessage and is cleared when the cache has
   * finished closing
   */
  public void clearForCacheClose() {
    failoverMap.clear();
    lastHistoryItem = 0;
    Arrays.fill(txHistory, null);
  }

  @VisibleForTesting
  public int getFailoverMapSize() {
    return failoverMap.size();
  }
}
