/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.persistence;

import java.util.HashSet;
import java.util.Set;

import org.apache.logging.log4j.Logger;

import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.CacheDistributionAdvisor;
import org.apache.geode.internal.cache.CacheDistributionAdvisor.InitialImageAdvice;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.logging.internal.log4j.api.LogService;

public class PersistenceInitialImageAdvisor {

  private static final Logger logger = LogService.getLogger();

  private final InternalPersistenceAdvisor persistenceAdvisor;
  private final String shortDiskStoreID;
  private final String regionPath;
  private final CacheDistributionAdvisor cacheDistributionAdvisor;
  private final boolean hasDiskImageToRecoverFrom;

  public PersistenceInitialImageAdvisor(InternalPersistenceAdvisor persistenceAdvisor,
      String shortDiskStoreID, String regionPath, CacheDistributionAdvisor cacheDistributionAdvisor,
      boolean hasDiskImageToRecoverFrom) {
    this.persistenceAdvisor = persistenceAdvisor;
    this.shortDiskStoreID = shortDiskStoreID;
    this.regionPath = regionPath;
    this.cacheDistributionAdvisor = cacheDistributionAdvisor;
    this.hasDiskImageToRecoverFrom = hasDiskImageToRecoverFrom;
  }

  public InitialImageAdvice getAdvice(InitialImageAdvice previousAdvice) {
    final boolean isPersistAdvisorDebugEnabled =
        logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE);

    MembershipChangeListener listener = new MembershipChangeListener(persistenceAdvisor);
    cacheDistributionAdvisor.addMembershipAndProxyListener(listener);
    persistenceAdvisor.addListener(listener);
    try {
      while (true) {
        cacheDistributionAdvisor.getAdvisee().getCancelCriterion().checkCancelInProgress(null);
        try {
          // On first pass, previous advice is null. On subsequent passes, it's the advice
          // from the previous iteration.
          InitialImageAdvice advice =
              cacheDistributionAdvisor.adviseInitialImage(previousAdvice, true);

          if (hasReplicates(advice)) {
            if (hasDiskImageToRecoverFrom) {
              removeReplicatesIfWeAreEqualToAnyOrElseClearEqualMembers(advice.getReplicates());
            }
            return advice;
          } else if (hasNonPersistentMember(advice)) {
            updateMembershipViewFromAnyPeer(advice.getNonPersistent(), hasDiskImageToRecoverFrom);
          }

          // Fix for 51698 - If there are online members that we previously failed to get a GII
          // from, retry those members rather than wait for new persistent members to recover.
          if (hasReplicates(previousAdvice)) {
            logger.info(
                "GII failed from all sources, but members are still online. Retrying the GII.");
            previousAdvice = null;
            continue;
          }

          Set<PersistentMemberID> previouslyOnlineMembers =
              persistenceAdvisor.getPersistedOnlineOrEqualMembers();

          // If there are no currently online members, and no previously online members, this member
          // should just go with what's on its own disk
          if (previouslyOnlineMembers.isEmpty()) {
            if (isPersistAdvisorDebugEnabled()) {
              logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
                  "{}-{}: No previously online members. Recovering with the data from the local disk",
                  shortDiskStoreID, regionPath);
            }
            return advice;
          }

          Set<PersistentMemberID> offlineMembers = new HashSet<>();
          Set<PersistentMemberID> membersToWaitFor =
              persistenceAdvisor.getMembersToWaitFor(previouslyOnlineMembers, offlineMembers);

          if (membersToWaitFor.isEmpty()) {
            if (isPersistAdvisorDebugEnabled()) {
              logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
                  "{}-{}: All of the previously online members are now online and waiting for us. Acquiring tie lock. Previously online members {}",
                  shortDiskStoreID, regionPath, advice.getReplicates());
            }
            if (persistenceAdvisor.acquireTieLock()) {
              return refreshInitialImageAdviceAndThenCheckMyStateWithReplicates(previousAdvice);
            }
          } else {
            if (isPersistAdvisorDebugEnabled) {
              logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
                  "{}-{}: Going to wait for these member ids: {}", shortDiskStoreID, regionPath,
                  membersToWaitFor);
            }
          }

          waitForMembershipChangeForMissingDiskStores(listener, offlineMembers, membersToWaitFor);
        } catch (InterruptedException e) {
          logger.debug("Interrupted while trying to determine latest persisted copy", e);
        }
      }
    } finally {
      persistenceAdvisor.setWaitingOnMembers(null, null);
      cacheDistributionAdvisor.removeMembershipAndProxyListener(listener);
      persistenceAdvisor.removeListener(listener);
    }
  }

  private void updateMembershipViewFromAnyPeer(Set<InternalDistributedMember> peers,
      boolean recoverFromDisk) {
    for (InternalDistributedMember peer : peers) {
      try {
        persistenceAdvisor.updateMembershipView(peer, recoverFromDisk);
        return;
      } catch (ReplyException e) {
        if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
          logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, "Failed to update membership view", e);
        }
      }
    }
  }

  private InitialImageAdvice refreshInitialImageAdviceAndThenCheckMyStateWithReplicates(
      InitialImageAdvice previousAdvice) {
    if (isPersistAdvisorDebugEnabled()) {
      logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
          "{}-{}: Acquired the lock. This member will initialize", shortDiskStoreID, regionPath);
    }
    InitialImageAdvice advice = cacheDistributionAdvisor.adviseInitialImage(previousAdvice, true);
    if (hasReplicates(advice)) {
      if (isPersistAdvisorDebugEnabled()) {
        logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
            "{}-{}: Another member has initialized while we were getting the lock. We will initialize from that member",
            shortDiskStoreID, regionPath);
      }
      persistenceAdvisor.checkMyStateOnMembers(advice.getReplicates());
    }
    return advice;
  }

  private boolean hasNonPersistentMember(InitialImageAdvice advice) {
    return !advice.getNonPersistent().isEmpty();
  }

  /**
   * if one or more replicates are equal to this member: remove replicates from advice, return
   * advice for GII loop
   */
  private void removeReplicatesIfWeAreEqualToAnyOrElseClearEqualMembers(
      Set<InternalDistributedMember> replicates) {
    if (isPersistAdvisorDebugEnabled()) {
      logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
          "{}-{}: There are members currently online. Checking for our state on those members and then initializing",
          shortDiskStoreID, regionPath);
    }
    // Check with these members to make sure that they have heard of us. If any of them
    // say we have the same data on disk, we don't need to do a GII.
    boolean weAreEqualToAReplicate = persistenceAdvisor.checkMyStateOnMembers(replicates);
    if (weAreEqualToAReplicate) {
      // prevent GII by removing all replicates
      removeReplicates(replicates);
    } else {
      persistenceAdvisor.clearEqualMembers();
    }
    // Either a replicate has said we're equal and we've cleared replicates,
    // or none of them said we're equal and we've cleared our equal members.
    // We had replicates. Now one of these things is true:
    // - We've cleared replicates (meaning we're equal to one, so can load from disk)
    // - No replicates report we're equal (so must GII from one, which we indicate by clearing equal
    // members).
  }

  private boolean hasReplicates(InitialImageAdvice advice) {
    return advice != null && !advice.getReplicates().isEmpty();
  }

  private void removeReplicates(Set<InternalDistributedMember> replicates) {
    if (isPersistAdvisorDebugEnabled()) {
      logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
          "{}-{}: We have the same data on disk as one of {} recovering gracefully",
          shortDiskStoreID, regionPath, replicates);
    }
    replicates.clear();
  }

  private void waitForMembershipChangeForMissingDiskStores(MembershipChangeListener listener,
      Set<PersistentMemberID> offlineMembers, Set<PersistentMemberID> membersToWaitFor)
      throws InterruptedException {
    persistenceAdvisor.beginWaitingForMembershipChange(membersToWaitFor);
    try {
      // The persistence advisor needs to know which members are really not available because the
      // user uses this information to decide which members they haven't started yet.
      // membersToWaitFor includes members that are still waiting to start up, but are waiting for
      // members other than the current member. So we pass the set of offline members here.

      persistenceAdvisor.setWaitingOnMembers(membersToWaitFor, offlineMembers);
      listener.waitForChange();
    } finally {
      persistenceAdvisor.endWaitingForMembershipChange();
    }
  }

  private boolean isPersistAdvisorDebugEnabled() {
    return logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE);
  }
}
