/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.partitioned;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.Logger;

import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.BucketPersistenceAdvisor;
import org.apache.geode.internal.cache.ColocationHelper;
import org.apache.geode.internal.cache.DiskStoreImpl;
import org.apache.geode.internal.cache.PRHARedundancyProvider;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.ProxyBucketRegion;
import org.apache.geode.internal.cache.persistence.PersistentMemberID;
import org.apache.geode.internal.cache.persistence.PersistentStateListener;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThread;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.internal.process.StartupStatus;
import org.apache.geode.internal.util.TransformUtils;

/**
 * Consolidates logging during the recovery of ProxyRegionBuckets that are not hosted by this
 * member. The logger is meant to run in its own thread.
 * It uses a count down latch to determine whether the recovery is finished.
 */
public class PersistentBucketRecoverer extends RecoveryRunnable implements PersistentStateListener {

  private static final Logger logger = LogService.getLogger();

  /**
   * True when one or more buckets have reported a change in status.
   */
  private volatile boolean membershipChanged = true;

  /**
   * Sleep period between posting log entries.
   */
  private static final int SLEEP_PERIOD = 15000;

  /**
   * Used to determine when all proxy buckets have been recovered.
   */
  private final CountDownLatch allBucketsRecoveredFromDisk;

  private final List<RegionStatus> regions;


  /**
   * Creates a new PersistentBucketRecoverer.
   *
   */
  public PersistentBucketRecoverer(PRHARedundancyProvider prhaRedundancyProvider,
      int proxyBuckets) {
    super(prhaRedundancyProvider);
    PartitionedRegion baseRegion =
        ColocationHelper.getLeaderRegion(redundancyProvider.getPartitionedRegion());
    List<PartitionedRegion> colocatedRegions =
        getColocatedChildRegions(baseRegion);
    List<RegionStatus> allRegions = new ArrayList<RegionStatus>(colocatedRegions.size() + 1);
    if (baseRegion.getDataPolicy().withPersistence()) {
      allRegions.add(new RegionStatus(baseRegion));
    }
    for (PartitionedRegion region : colocatedRegions) {
      if (region.getDataPolicy().withPersistence()) {
        allRegions.add(new RegionStatus(region));
      }
    }

    regions = Collections.unmodifiableList(allRegions);
    allBucketsRecoveredFromDisk = new CountDownLatch(proxyBuckets);
    membershipChanged = true;
    addListeners();

  }

  List<PartitionedRegion> getColocatedChildRegions(PartitionedRegion baseRegion) {
    return ColocationHelper.getColocatedChildRegions(baseRegion);
  }

  public void startLoggingThread() {
    Thread loggingThread = new LoggingThread(
        "PersistentBucketRecoverer for region "
            + redundancyProvider.getPartitionedRegion().getName(),
        false,
        this);
    loggingThread.start();
  }

  /**
   * Called when a member comes online for a bucket.
   */
  @Override
  public void memberOnline(InternalDistributedMember member, PersistentMemberID persistentID) {
    this.membershipChanged = true;
  }

  /**
   * Called when a member goes offline for a bucket.
   */
  @Override
  public void memberOffline(InternalDistributedMember member, PersistentMemberID persistentID) {
    this.membershipChanged = true;
  }

  /**
   * Called when a member is removed for a bucket.
   */
  @Override
  public void memberRemoved(PersistentMemberID persistentID, boolean revoked) {
    this.membershipChanged = true;
  }



  /**
   * Add this PersistentBucketRecoverer as a persistence listener to all the region's bucket
   * advisors.
   */
  private void addListeners() {
    for (RegionStatus region : regions) {
      region.addListeners();
    }
  }

  /**
   * Removes this PersistentBucketRecoverer as a persistence listener from all the region's bucket
   * advisors.
   */
  private void removeListeners() {
    for (RegionStatus region : regions) {
      region.removeListeners();
    }
  }



  /**
   * Writes a consolidated log entry every SLEEP_PERIOD that summarizes which buckets are still
   * waiting on persistent members for the region.
   */
  @Override
  public void run2() {
    try {
      boolean warningLogged = false;
      while (getLatchCount() > 0) {
        int sleepMillis = SLEEP_PERIOD;
        // reduce the first log time from 15secs so that higher layers can
        // report sooner to user
        if (!warningLogged) {
          sleepMillis = SLEEP_PERIOD / 2;
        }
        Thread.sleep(sleepMillis);

        if (this.membershipChanged) {
          this.membershipChanged = false;
          for (RegionStatus region : regions) {
            region.logWaitingForMembers();
          }
          warningLogged = true;
        }
      }

    } catch (InterruptedException e) {
      // Log and bail
      logger.error(e.getMessage(), e);
    } finally {
      /*
       * Our job is done. Stop listening to the bucket advisors.
       */
      removeListeners();
      /*
       * Make sure the recovery completion message was printed to the log.
       */
      for (RegionStatus region : regions) {
        if (!region.loggedDoneMessage) {
          region.logDoneMessage();
        }
      }
    }
  }


  /**
   * Keeps track of logging a message for a single partitioned region and logging a separate message
   * when the waiting is done for the same region
   *
   */
  private class RegionStatus {
    /**
     * The persistent identifier of the member running this PersistentBucketRecoverer.
     */
    private final PersistentMemberID thisMember;

    /**
     * The region that the proxy buckets belong to.
     */
    private final String region;

    /**
     * An array of ProxyBucketRegions that comprise this partitioned region.
     */
    private final ProxyBucketRegion[] bucketRegions;

    /**
     * Indicates that a completion message has been logged.
     */
    private volatile boolean loggedDoneMessage = true;

    public RegionStatus(PartitionedRegion region) {
      this.thisMember = createPersistentMemberID(region);
      this.region = region.getFullPath();
      this.bucketRegions = region.getRegionAdvisor().getProxyBucketArray();
    }

    public void removeListeners() {
      for (ProxyBucketRegion proxyBucket : this.bucketRegions) {
        proxyBucket.getPersistenceAdvisor().removeListener(PersistentBucketRecoverer.this);
      }
    }

    public void addListeners() {
      for (ProxyBucketRegion proxyBucket : this.bucketRegions) {
        proxyBucket.getPersistenceAdvisor().addListener(PersistentBucketRecoverer.this);
      }
    }

    /**
     * Creates a temporary (and somewhat fake) PersistentMemberID for this member if there is no
     * DiskStore available for our region (which can happen in some colocated scenarios).
     */
    private PersistentMemberID createPersistentMemberID(PartitionedRegion region) {
      DiskStoreImpl diskStore = null;

      /*
       * A non-persistent colocated region will not have a disk store so check the leader region if
       * this region does not have one.
       */
      if (region.getAttributes().getDataPolicy().withPersistence()) {
        diskStore = region.getDiskStore();
      } else if (ColocationHelper.getLeaderRegion(region).getAttributes().getDataPolicy()
          .withPersistence()) {
        diskStore = ColocationHelper.getLeaderRegion(region).getDiskStore();
      }

      /*
       * We have a DiskStore? Great! Simply have it generate the id.
       */
      if (null != diskStore) {
        return diskStore.generatePersistentID();
      }

      /*
       * Bummer. No DiskStore. Put together a fake one (for logging only).
       */
      {
        String name = "No name for this member";
        String diskDir = System.getProperty("user.dir");
        InetAddress localHost = null;

        try {
          localHost = SocketCreator.getLocalHost();
        } catch (UnknownHostException e) {
          logger.error("Could not determine my own host", e);
        }

        return (new PersistentMemberID(null, localHost, diskDir, name,
            redundancyProvider.getPartitionedRegion().getCache().cacheTimeMillis(), (short) 0));
      }
    }

    /**
     * Returns a unique Set of persistent members that all the ProxyBucketRegions are waiting for.
     *
     * @param offlineOnly true if only the members which are not currently try running should be
     *        returned, false to return all members that this member is waiting for, including
     *        members which are running but not fully initialized.
     */
    private Map<PersistentMemberID, Set<Integer>> getMembersToWaitFor(boolean offlineOnly) {
      Map<PersistentMemberID, Set<Integer>> waitingForMembers =
          new HashMap<PersistentMemberID, Set<Integer>>();


      for (ProxyBucketRegion proxyBucket : this.bucketRegions) {
        Integer bucketId = proxyBucket.getBucketId();

        // Get the set of missing members from the persistence advisor
        Set<PersistentMemberID> missingMembers;
        BucketPersistenceAdvisor persistenceAdvisor = proxyBucket.getPersistenceAdvisor();
        if (offlineOnly) {
          missingMembers = persistenceAdvisor.getMissingMembers();
        } else {
          missingMembers = persistenceAdvisor.getAllMembersToWaitFor();
        }

        if (missingMembers != null) {
          for (PersistentMemberID missingMember : missingMembers) {
            Set<Integer> buckets = waitingForMembers.get(missingMember);
            if (buckets == null) {
              buckets = new TreeSet<Integer>();
              waitingForMembers.put(missingMember, buckets);
            }
            buckets.add(bucketId);
          }
        }
      }

      return waitingForMembers;
    }

    /**
     * Prints a recovery completion message to the log.
     */
    private void logDoneMessage() {
      this.loggedDoneMessage = true;
      StartupStatus.startup(
          String.format(
              "Region %s has successfully completed waiting for other members to recover the latest data.My persistent member information:%s",
              new Object[] {this.region,
                  TransformUtils.persistentMemberIdToLogEntryTransformer
                      .transform(this.thisMember)}));
    }

    /**
     * Logs a consolidated log entry for all ProxyBucketRegions waiting for persistent members.
     */
    private void logWaitingForMembers() {
      Map<PersistentMemberID, Set<Integer>> offlineMembers = getMembersToWaitFor(true);
      Map<PersistentMemberID, Set<Integer>> allMembersToWaitFor = getMembersToWaitFor(false);

      boolean thereAreBucketsToBeRecovered = (getLatchCount() > 0);

      /*
       * Log any offline members the region is waiting for.
       */
      if (thereAreBucketsToBeRecovered && !offlineMembers.isEmpty()) {
        Set<String> membersToWaitForLogEntries = new HashSet<String>();

        TransformUtils.transform(offlineMembers.entrySet(), membersToWaitForLogEntries,
            TransformUtils.persistentMemberEntryToLogEntryTransformer);

        Set<Integer> missingBuckets = getAllWaitingBuckets(offlineMembers);

        StartupStatus.startup(
            String.format(
                "Region %s (and any colocated sub-regions) has potentially stale data.  Buckets %s are waiting for another offline member to recover the latest data.My persistent id is:%sOffline members with potentially new data:%sUse the gfsh show missing-disk-stores command to see all disk stores that are being waited on by other members.",
                new Object[] {this.region, missingBuckets,
                    TransformUtils.persistentMemberIdToLogEntryTransformer
                        .transform(this.thisMember),
                    membersToWaitForLogEntries}));

        this.loggedDoneMessage = false;
      }
      /*
       * No offline? Then log any online members the region is waiting for.
       */
      else if (thereAreBucketsToBeRecovered && !allMembersToWaitFor.isEmpty()) {
        Set<String> membersToWaitForLogEntries = new HashSet<String>();

        Set<Integer> missingBuckets = getAllWaitingBuckets(allMembersToWaitFor);
        TransformUtils.transform(allMembersToWaitFor.entrySet(), membersToWaitForLogEntries,
            TransformUtils.persistentMemberEntryToLogEntryTransformer);

        StartupStatus.startup(
            String.format(
                "Region %s (and any colocated sub-regions) has potentially stale data.  Buckets %s are waiting for another online member to recover the latest data.My persistent id is:%sOnline members with potentially new data:%sUse the gfsh show missing-disk-stores command to see all disk stores that are being waited on by other members.",
                new Object[] {this.region, missingBuckets,
                    TransformUtils.persistentMemberIdToLogEntryTransformer
                        .transform(this.thisMember),
                    membersToWaitForLogEntries}));


        this.loggedDoneMessage = false;
      }
      /*
       * No online? Then log that we are done.
       */
      else if (!this.loggedDoneMessage) {
        logDoneMessage();
      }
    }

    /**
     * Get a consolodated set of all buckets that are waiting.
     */
    private Set<Integer> getAllWaitingBuckets(
        Map<PersistentMemberID, Set<Integer>> offlineMembers) {
      Set<Integer> allWaitingBuckets = new TreeSet<Integer>();
      for (Set<Integer> missingPerMember : offlineMembers.values()) {
        allWaitingBuckets.addAll(missingPerMember);
      }
      return allWaitingBuckets;
    }
  }

  public void await(long timeout, TimeUnit unit) {
    boolean interrupted = false;
    while (true) {
      try {
        redundancyProvider.getPartitionedRegion().getCancelCriterion().checkCancelInProgress(null);
        boolean done = allBucketsRecoveredFromDisk.await(timeout, unit);
        if (done) {
          break;
        }
      } catch (InterruptedException e) {
        interrupted = true;
      }
    }
    if (interrupted) {
      Thread.currentThread().interrupt();
    }
  }

  public void await() {
    boolean interrupted = false;
    while (true) {
      try {
        getAllBucketsRecoveredFromDiskLatch().await();
        break;
      } catch (InterruptedException e) {
        interrupted = true;
      }
    }
    if (interrupted) {
      Thread.currentThread().interrupt();
    }
  }

  public void countDown() {
    allBucketsRecoveredFromDisk.countDown();
  }

  public void countDown(int size) {
    while (size > 0) {
      allBucketsRecoveredFromDisk.countDown();
      --size;
    }
  }

  public boolean hasRecoveryCompleted() {
    if (getLatchCount() > 0) {
      return false;
    }
    return true;
  }

  long getLatchCount() {
    return allBucketsRecoveredFromDisk.getCount();
  }

  CountDownLatch getAllBucketsRecoveredFromDiskLatch() {
    return allBucketsRecoveredFromDisk;
  }

}
