/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.distributed.internal;

import java.util.List;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.logging.log4j.Logger;

import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.internal.locks.ElderState;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.util.concurrent.StoppableReentrantLock;
import org.apache.geode.logging.internal.log4j.api.LogService;

public class ClusterElderManager {
  private static final Logger logger = LogService.getLogger();

  private final ClusterDistributionManager clusterDistributionManager;
  private final StoppableReentrantLock elderLock;

  private ElderState elderState;
  private volatile boolean elderStateInitialized;
  private final Supplier<ElderState> elderStateSupplier;

  public ClusterElderManager(ClusterDistributionManager clusterDistributionManager) {
    this(clusterDistributionManager, () -> new ElderState(clusterDistributionManager));
  }

  public ClusterElderManager(ClusterDistributionManager clusterDistributionManager,
      Supplier<ElderState> elderStateSupplier) {
    this.clusterDistributionManager = clusterDistributionManager;
    this.elderLock = new StoppableReentrantLock(clusterDistributionManager.getCancelCriterion());
    this.elderStateSupplier = elderStateSupplier;
  }

  /**
   * Based on a recent JGroups view, return a member that might be the next elder.
   *
   * @return the elder candidate, possibly this VM.
   */
  InternalDistributedMember getElderCandidate() {
    return getElderCandidates().stream().findFirst().orElse(null);
  }

  List<InternalDistributedMember> getElderCandidates() {
    List<InternalDistributedMember> theMembers = clusterDistributionManager.getViewMembers();

    return theMembers.stream()
        .filter(member -> member.getVmKind() != ClusterDistributionManager.ADMIN_ONLY_DM_TYPE)
        .filter(
            member -> !clusterDistributionManager.getMembershipManager().isSurpriseMember(member))
        .collect(Collectors.toList());
  }

  public InternalDistributedMember getElderId() throws DistributedSystemDisconnectedException {
    if (clusterDistributionManager.isCloseInProgress()) {
      throw new DistributedSystemDisconnectedException(
          "no valid elder when system is shutting down",
          clusterDistributionManager.getRootCause());
    }
    clusterDistributionManager.getSystem().getCancelCriterion().checkCancelInProgress(null);

    return getElderCandidate();
  }

  public boolean isElder() {
    return clusterDistributionManager.getId().equals(getElderCandidate());
  }

  public ElderState getElderState(boolean waitToBecomeElder) throws InterruptedException {
    if (waitToBecomeElder) {
      waitForElder(clusterDistributionManager.getId());
    }

    if (!isElder() && !waitToBecomeElder) {
      return null; // early return if this clusterDistributionManager is not the elder
    }

    if (this.elderStateInitialized) {
      return this.elderState;
    } else {
      return initializeElderState();
    }
  }

  private ElderState initializeElderState() {
    this.elderLock.lock();

    try {
      if (this.elderState == null) {
        this.elderState = elderStateSupplier.get();
      }
      this.elderStateInitialized = true;
    } finally {
      this.elderLock.unlock();
    }

    return this.elderState;
  }

  /**
   * Waits until (elder is desiredElder) or (desiredElder is no longer a member) or (the local
   * member is the elder)
   *
   * @return true if desiredElder is the elder; false if it is no longer a member or the local
   *         member is the elder
   */
  public boolean waitForElder(final InternalDistributedMember desiredElder)
      throws InterruptedException {
    MembershipChangeListener changeListener =
        new MembershipChangeListener();

    clusterDistributionManager.addMembershipListener(changeListener);

    InternalDistributedMember currentElder;

    try {
      while (true) {
        if (clusterDistributionManager.isCloseInProgress()) {
          return false;
        }
        currentElder = getElderCandidate();
        if (desiredElder.equals(currentElder)) {
          return true;
        }
        if (logger.isDebugEnabled()) {
          logger.debug("Expecting Elder to be {} but it is {}.",
              desiredElder, currentElder);
        }
        if (!clusterDistributionManager.isCurrentMember(desiredElder)) {
          return false; // no longer present
        }
        if (!clusterDistributionManager.getId().equals(desiredElder)
            && clusterDistributionManager.getId().equals(currentElder)) {
          // Once we become the elder we no longer allow anyone else to be the
          // elder so don't let them wait anymore.
          return false;
        }

        if (logger.isDebugEnabled()) {
          logger.debug("Waiting for membership to change");
        }
        changeListener.waitForMembershipChange();
      }
    } finally {
      clusterDistributionManager.removeMembershipListener(changeListener);
    }
  }

  private static class MembershipChangeListener implements MembershipListener {
    private boolean changeOccurred = false;

    @Override
    public void memberJoined(DistributionManager distributionManager,
        InternalDistributedMember theId) {
      signalChange();
      // nothing needed
    }

    @Override
    public void memberDeparted(DistributionManager distributionManager,
        InternalDistributedMember theId, boolean crashed) {
      signalChange();
    }

    @Override
    public void memberSuspect(DistributionManager distributionManager,
        InternalDistributedMember id, InternalDistributedMember whoSuspected,
        String reason) {
      signalChange();
    }

    @Override
    public void quorumLost(DistributionManager distributionManager,
        Set<InternalDistributedMember> failures,
        List<InternalDistributedMember> remaining) {
      signalChange();
    }

    private synchronized void signalChange() {
      changeOccurred = true;
      notifyAll();
    }

    public synchronized void waitForMembershipChange() throws InterruptedException {
      if (!changeOccurred) {
        wait(100);
      }
      changeOccurred = false;
    }
  }
}
