/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.samza.clustermanager;

import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.job.model.ProcessorLocality;
import org.apache.samza.job.model.JobModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * Encapsulates logic and state concerning standby-containers.
 */
public class StandbyContainerManager {

  private static final Logger log = LoggerFactory.getLogger(StandbyContainerManager.class);

  private final SamzaApplicationState samzaApplicationState;

  private final LocalityManager localityManager;

  // Map of samza containerIDs to their corresponding active and standby containers, e.g., 0 -> {0-0, 0-1}, 0-0 -> {0, 0-1}
  // This is used for checking no two standbys or active-standby-pair are started on the same host
  private final Map<String, List<String>> standbyContainerConstraints;

  // Map of active containers that are in failover, indexed by the active container's resourceID (at the time of failure)
  private final Map<String, FailoverMetadata> failovers;

  // Resource-manager, used to stop containers
  private ClusterResourceManager clusterResourceManager;

  public StandbyContainerManager(SamzaApplicationState samzaApplicationState,
      ClusterResourceManager clusterResourceManager, LocalityManager localityManager) {
    this.failovers = new ConcurrentHashMap<>();
    this.localityManager = localityManager;
    this.standbyContainerConstraints = new HashMap<>();
    this.samzaApplicationState = samzaApplicationState;
    JobModel jobModel = samzaApplicationState.jobModelManager.jobModel();

    // populate the standbyContainerConstraints map by iterating over all containers
    jobModel.getContainers()
        .keySet()
        .forEach(containerId -> standbyContainerConstraints.put(containerId,
            StandbyTaskUtil.getStandbyContainerConstraints(containerId, jobModel)));
    this.clusterResourceManager = clusterResourceManager;

    log.info("Populated standbyContainerConstraints map {}", standbyContainerConstraints);
  }

  /**
   * We handle the stopping of a container depending on the case which is decided using the exit-status:
   *    Case 1. an Active-Container which has stopped for an "unknown" reason, then we start it on the given preferredHost (but we record the resource-request)
   *    Case 2. Active container has stopped because of node failure, thene we initiate a failover
   *    Case 3. StandbyContainer has stopped after it was chosen for failover, see {@link StandbyContainerManager#handleStandbyContainerStop}
   *    Case 4. StandbyContainer has stopped but not because of a failover, see {@link StandbyContainerManager#handleStandbyContainerStop}
   *
   * @param containerID containerID of the stopped container
   * @param resourceID last resourceID of the stopped container
   * @param preferredHost the host on which the container was running
   * @param exitStatus the exit status of the failed container
   * @param containerAllocator the container allocator
   */
  public void handleContainerStop(String containerID, String resourceID, String preferredHost, int exitStatus,
      ContainerAllocator containerAllocator, Duration preferredHostRetryDelay) {

    if (StandbyTaskUtil.isStandbyContainer(containerID)) {
      handleStandbyContainerStop(containerID, resourceID, preferredHost, containerAllocator, preferredHostRetryDelay);
    } else {
      // initiate failover for the active container based on the exitStatus
      switch (exitStatus) {
        case SamzaResourceStatus.DISK_FAIL:
        case SamzaResourceStatus.ABORTED:
        case SamzaResourceStatus.PREEMPTED:
          initiateStandbyAwareAllocation(containerID, resourceID, containerAllocator);
          break;
        // in all other cases, request-resource for the failed container, but record the resource-request, so that
        // if this request expires, we can do a failover -- select a standby to stop & start the active on standby's host
        default:
          log.info("Requesting resource for active-container {} on host {}", containerID, preferredHost);
          SamzaResourceRequest resourceRequest = containerAllocator.getResourceRequestWithDelay(containerID, preferredHost, preferredHostRetryDelay);

          FailoverMetadata failoverMetadata = registerActiveContainerFailure(containerID, resourceID);
          failoverMetadata.recordResourceRequest(resourceRequest);
          containerAllocator.issueResourceRequest(resourceRequest);
          break;
      }
    }
  }

  /**
   * Handle the failed launch of a container, based on
   *    Case 1. If it is an active container, then initiate a failover for it.
   *    Case 2. If it is standby container, request a new resource on AnyHost.
   * @param containerID the ID of the container that has failed
   */
  public void handleContainerLaunchFail(String containerID, String resourceID,
      ContainerAllocator containerAllocator) {

    if (StandbyTaskUtil.isStandbyContainer(containerID)) {
      log.info("Handling launch fail for standby-container {}, requesting resource on any host {}", containerID);
      containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
    } else {
      initiateStandbyAwareAllocation(containerID, resourceID, containerAllocator);
    }
  }

  /**
   *  Handle the failed stop for a container, based on
   *  Case 1. If it is standby container, continue the failover
   *  Case 2. If it is an active container, then this is in invalid state and throw an exception to alarm/restart.
   * @param containerID the ID (e.g., 0, 1, 2) of the container that has failed
   * @param resourceID id of the resource used for the failed container
   */
  public void handleContainerStopFail(String containerID, String resourceID,
      ContainerAllocator containerAllocator) {
    if (StandbyTaskUtil.isStandbyContainer(containerID)) {
      log.info("Handling stop fail for standby-container {}, continuing the failover (if present)", containerID);

      // if this standbyContainerResource was stopped for a failover, we will find a metadata entry
      Optional<StandbyContainerManager.FailoverMetadata> failoverMetadata = this.checkIfUsedForFailover(resourceID);

      // if we find a metadata entry, we continue with the failover (select another standby or any-host appropriately)
      failoverMetadata.ifPresent(
        metadata -> initiateStandbyAwareAllocation(metadata.activeContainerID, metadata.activeContainerResourceID,
            containerAllocator));
    } else {
      // If this class receives a callback for stop-fail on an active container, throw an exception
      throw new SamzaException("Invalid State. Received stop container fail for container Id: " + containerID);
    }
  }

  /**
   *  If a standby container has stopped, then there are two possible cases
   *    Case 1. during a failover, the standby container was stopped for an active's start, then we
   *       1. request a resource on the standby's host to place the activeContainer, and
   *       2. request anyhost to place this standby
   *
   *    Case 2. independent of a failover, the standby container stopped, in which proceed with its resource-request
   * @param standbyContainerID SamzaContainerID of the standby container
   * @param preferredHost Preferred host of the standby container
   */
  private void handleStandbyContainerStop(String standbyContainerID, String resourceID, String preferredHost,
      ContainerAllocator containerAllocator, Duration preferredHostRetryDelay) {

    // if this standbyContainerResource was stopped for a failover, we will find a metadata entry
    Optional<StandbyContainerManager.FailoverMetadata> failoverMetadata = this.checkIfUsedForFailover(resourceID);

    if (failoverMetadata.isPresent()) {
      String activeContainerID = failoverMetadata.get().activeContainerID;
      String standbyContainerHostname = failoverMetadata.get().getStandbyContainerHostname(resourceID);

      log.info("Requesting resource for active container {} on host {}, and backup container {} on any host",
          activeContainerID, standbyContainerHostname, standbyContainerID);

      // request standbycontainer's host for active-container
      SamzaResourceRequest resourceRequestForActive =
          containerAllocator.getResourceRequestWithDelay(activeContainerID, standbyContainerHostname, preferredHostRetryDelay);
      // record the resource request, before issuing it to avoid race with allocation-thread
      failoverMetadata.get().recordResourceRequest(resourceRequestForActive);
      containerAllocator.issueResourceRequest(resourceRequestForActive);

      // request any-host for standby container
      containerAllocator.requestResource(standbyContainerID, ResourceRequestState.ANY_HOST);
    } else {
      log.info("Issuing request for standby container {} on host {}, since this is not for a failover",
          standbyContainerID, preferredHost);
      containerAllocator.requestResourceWithDelay(standbyContainerID, preferredHost, preferredHostRetryDelay);
    }
  }

  /** Method to handle standby-aware allocation for an active container.
   *  We try to find a standby host for the active container, and issue a stop on any standby-containers running on it,
   *  request resource to place the active on the standby's host, and one to place the standby elsewhere.
   *
   * @param activeContainerID the samzaContainerID of the active-container
   * @param resourceID  the samza-resource-ID of the container when it failed (used to index failover-state)
   */
  private void initiateStandbyAwareAllocation(String activeContainerID, String resourceID,
      ContainerAllocator containerAllocator) {

    String standbyHost = this.selectStandbyHost(activeContainerID, resourceID);

    // if the standbyHost returned is anyhost, we proceed with the request directly
    if (standbyHost.equals(ResourceRequestState.ANY_HOST)) {
      log.info("No standby container found for active container {}, making a resource-request for placing {} on {}, active's resourceID: {}",
          activeContainerID, activeContainerID, ResourceRequestState.ANY_HOST, resourceID);
      samzaApplicationState.failoversToAnyHost.incrementAndGet();
      containerAllocator.requestResource(activeContainerID, ResourceRequestState.ANY_HOST);

    } else {

      // Check if there is a running standby-container on that host that needs to be stopped
      List<String> standbySamzaContainerIds = this.standbyContainerConstraints.get(activeContainerID);

      Map<String, SamzaResource> runningStandbyContainersOnHost = new HashMap<>();
      this.samzaApplicationState.runningProcessors.forEach((samzaContainerId, samzaResource) -> {
        if (standbySamzaContainerIds.contains(samzaContainerId) && samzaResource.getHost().equals(standbyHost)) {
          runningStandbyContainersOnHost.put(samzaContainerId, samzaResource);
        }
      });

      if (runningStandbyContainersOnHost.isEmpty()) {
        // if there are no running standby-containers on the standbyHost, we proceed to directly make a resource request

        log.info("No running standby container to stop on host {}, making a resource-request for placing {} on {}, active's resourceID: {}",
            standbyHost, activeContainerID, standbyHost, resourceID);
        FailoverMetadata failoverMetadata = this.registerActiveContainerFailure(activeContainerID, resourceID);

        // record the resource request, before issuing it to avoid race with allocation-thread
        SamzaResourceRequest resourceRequestForActive =
            containerAllocator.getResourceRequest(activeContainerID, standbyHost);
        failoverMetadata.recordResourceRequest(resourceRequestForActive);
        containerAllocator.issueResourceRequest(resourceRequestForActive);
        samzaApplicationState.failoversToStandby.incrementAndGet();
      } else {
        // if there is a running standby-container on the standbyHost, we issue a stop (the stopComplete callback completes the remainder of the flow)
        FailoverMetadata failoverMetadata = this.registerActiveContainerFailure(activeContainerID, resourceID);

        runningStandbyContainersOnHost.forEach((standbyContainerID, standbyResource) -> {
          log.info("Initiating failover and stopping standby container, found standbyContainer {} = resource {}, "
                  + "for active container {}", runningStandbyContainersOnHost.keySet(),
              runningStandbyContainersOnHost.values(), activeContainerID);
          failoverMetadata.updateStandbyContainer(standbyResource.getContainerId(), standbyResource.getHost());
          samzaApplicationState.failoversToStandby.incrementAndGet();
          this.clusterResourceManager.stopStreamProcessor(standbyResource);
        });

        // if multiple standbys are on the same host, we are in an invalid state, so we fail the deploy and retry
        if (runningStandbyContainersOnHost.size() > 1) {
          throw new SamzaException(
              "Invalid State. Multiple standby containers found running on one host:" + runningStandbyContainersOnHost);
        }
      }
    }
  }

  /**
   * Method to select a standby host for a given active container.
   * 1. We first try to select a host which has a running standby-container, that we haven't already selected for failover.
   * 2. If we dont any such host, we iterate over last-known standbyHosts, if we haven't already selected it for failover.
   * 3. If still dont find a host, we fall back to AnyHost.
   *
   * See https://issues.apache.org/jira/browse/SAMZA-2140
   *
   * @param activeContainerID Samza containerID of the active container
   * @param activeContainerResourceID ResourceID of the active container at the time of its last failure
   * @return standby host for the active container (if found), Any-host otherwise.
   */
  private String selectStandbyHost(String activeContainerID, String activeContainerResourceID) {

    log.info("Standby containers {} for active container {} (resourceID {})", this.standbyContainerConstraints.get(activeContainerID), activeContainerID, activeContainerResourceID);

    // obtain any existing failover metadata
    Optional<FailoverMetadata> failoverMetadata = getFailoverMetadata(activeContainerResourceID);

    // Iterate over the list of running standby containers, to find a standby resource that we have not already
    // used for a failover for this active resoruce
    for (String standbyContainerID : this.standbyContainerConstraints.get(activeContainerID)) {

      if (samzaApplicationState.runningProcessors.containsKey(standbyContainerID)) {
        SamzaResource standbyContainerResource = samzaApplicationState.runningProcessors.get(standbyContainerID);

        // use this standby if there was no previous failover for which this standbyResource was used
        if (!(failoverMetadata.isPresent() && failoverMetadata.get()
            .isStandbyResourceUsed(standbyContainerResource.getContainerId()))) {

          log.info("Returning standby container {} in running state on host {} for active container {}",
              standbyContainerID, standbyContainerResource.getHost(), activeContainerID);
          return standbyContainerResource.getHost();
        }
      }
    }
    log.info("Did not find any running standby container for active container {}", activeContainerID);

    // We iterate over the list of last-known standbyHosts to check if anyone of them has not already been tried
    for (String standbyContainerID : this.standbyContainerConstraints.get(activeContainerID)) {

      String standbyHost =
          Optional.ofNullable(localityManager.readLocality().getProcessorLocality(standbyContainerID))
              .map(ProcessorLocality::host)
              .orElse(null);

      if (StringUtils.isNotBlank(standbyHost)) {
        log.info("No last known standbyHost for container {}", standbyContainerID);
      } else if (failoverMetadata.isPresent() && failoverMetadata.get().isStandbyHostUsed(standbyHost)) {

        log.info("Not using standby host {} for active container {} because it had already been selected", standbyHost,
            activeContainerID);
      } else {
        // standbyHost is valid and has not already been selected
        log.info("Returning standby host {} for active container {}", standbyHost, activeContainerID);
        return standbyHost;
      }
    }

    log.info("Did not find any standby host for active container {}, returning any-host", activeContainerID);
    return ResourceRequestState.ANY_HOST;
  }

  /**
   * Register the failure of an active container (identified by its resource ID).
   */
  private FailoverMetadata registerActiveContainerFailure(String activeContainerID, String activeContainerResourceID) {

    // this active container's resource ID is already registered, in which case update the metadata
    FailoverMetadata failoverMetadata;
    if (failovers.containsKey(activeContainerResourceID)) {
      failoverMetadata = failovers.get(activeContainerResourceID);
    } else {
      failoverMetadata = new FailoverMetadata(activeContainerID, activeContainerResourceID);
    }
    this.failovers.put(activeContainerResourceID, failoverMetadata);
    return failoverMetadata;
  }

  /**
   * Check if this standbyContainerResource is present in the failoverState for an active container.
   * This is used to determine if we requested a stop a container.
   */
  private Optional<FailoverMetadata> checkIfUsedForFailover(String standbyContainerResourceId) {

    if (standbyContainerResourceId == null) {
      return Optional.empty();
    }

    for (FailoverMetadata failoverMetadata : failovers.values()) {
      if (failoverMetadata.isStandbyResourceUsed(standbyContainerResourceId)) {
        log.info("Standby container with resource id {} was selected for failover of active container {}",
            standbyContainerResourceId, failoverMetadata.activeContainerID);
        return Optional.of(failoverMetadata);
      }
    }
    return Optional.empty();
  }

  /**
   * Check if matching this SamzaResourceRequest to the given resource, meets all standby-container container constraints.
   *
   * @param containerIdToStart logical id of the container to start
   * @param host potential host to start the container on
   * @return
   */
  boolean checkStandbyConstraints(String containerIdToStart, String host) {
    List<String> containerIDsForStandbyConstraints = this.standbyContainerConstraints.get(containerIdToStart);

    // Check if any of these conflicting containers are running/launching on host
    for (String containerID : containerIDsForStandbyConstraints) {
      SamzaResource resource = samzaApplicationState.pendingProcessors.get(containerID);

      // return false if a conflicting container is pending for launch on the host
      if (resource != null && resource.getHost().equals(host)) {
        log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
            containerIdToStart, host, containerID);
        return false;
      }

      // return false if a conflicting container is running on the host
      resource = samzaApplicationState.runningProcessors.get(containerID);
      if (resource != null && resource.getHost().equals(host)) {
        log.info("Container {} cannot be started on host {} because container {} is already running on this host",
            containerIdToStart, host, containerID);
        return false;
      }
    }

    return true;
  }

  /**
   *  Attempt to the run a container on the given candidate resource, if doing so meets the standby container constraints.
   * @param request The Samza container request
   * @param preferredHost the preferred host associated with the container
   * @param samzaResource the resource candidate
   */
  public void checkStandbyConstraintsAndRunStreamProcessor(SamzaResourceRequest request, String preferredHost,
      SamzaResource samzaResource, ContainerAllocator containerAllocator,
      ResourceRequestState resourceRequestState) {
    String containerID = request.getProcessorId();

    if (checkStandbyConstraints(containerID, samzaResource.getHost())) {
      // This resource can be used to launch this container
      log.info("Running container {} on {} meets standby constraints, preferredHost = {}", containerID,
          samzaResource.getHost(), preferredHost);
      containerAllocator.runStreamProcessor(request, preferredHost);
    } else if (StandbyTaskUtil.isStandbyContainer(containerID)) {
      // This resource cannot be used to launch this standby container, so we make a new anyhost request
      log.info(
          "Running standby container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource, and making a new ANY_HOST request",
          containerID, samzaResource.getHost());
      releaseUnstartableContainer(request, samzaResource, preferredHost, resourceRequestState);
      containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
      samzaApplicationState.failedStandbyAllocations.incrementAndGet();
    } else {
      // This resource cannot be used to launch this active container container, so we initiate a failover
      log.warn(
          "Running active container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource",
          containerID, samzaResource.getHost());
      releaseUnstartableContainer(request, samzaResource, preferredHost, resourceRequestState);
      Optional<FailoverMetadata> failoverMetadata = getFailoverMetadata(request);
      String lastKnownResourceID =
          failoverMetadata.isPresent() ? failoverMetadata.get().activeContainerResourceID : "unknown-" + containerID;
      initiateStandbyAwareAllocation(containerID, lastKnownResourceID, containerAllocator);
      samzaApplicationState.failedStandbyAllocations.incrementAndGet();
    }
  }

  /**
   * Handle an expired resource request
   * @param containerID the containerID for which the resource request was made
   * @param request the expired resource request
   * @param alternativeResource an alternative, already-allocated, resource (if available)
   * @param containerAllocator the container allocator (used to issue any required subsequent resource requests)
   * @param resourceRequestState used to cancel resource requests if required.
   */
  public void handleExpiredResourceRequest(String containerID, SamzaResourceRequest request,
      Optional<SamzaResource> alternativeResource, ContainerAllocator containerAllocator,
      ResourceRequestState resourceRequestState) {

    if (StandbyTaskUtil.isStandbyContainer(containerID)) {
      handleExpiredRequestForStandbyContainer(containerID, request, alternativeResource, containerAllocator,
          resourceRequestState);
    } else {
      handleExpiredRequestForActiveContainer(containerID, request, containerAllocator, resourceRequestState);
    }
  }

  /**
   * Fetches a list of standby container for an active container
   * @param activeContainerId logical id of the container ex: 0,1,2
   * @return list of standby containers ex: for active container 0: {0-0, 0-1}
   */
  List<String> getStandbyList(String activeContainerId) {
    return this.standbyContainerConstraints.get(activeContainerId);
  }

  /**
   * Release un-startable resources immediately and deletes requests corresponsing to it
   */
  void releaseUnstartableContainer(SamzaResourceRequest request, SamzaResource resource, String preferredHost,
      ResourceRequestState resourceRequestState) {
    resourceRequestState.releaseUnstartableContainer(resource, preferredHost);
    resourceRequestState.cancelResourceRequest(request);
  }


  // Handle an expired resource request that was made for placing a standby container
  private void handleExpiredRequestForStandbyContainer(String containerID, SamzaResourceRequest request,
      Optional<SamzaResource> alternativeResource, ContainerAllocator containerAllocator,
      ResourceRequestState resourceRequestState) {

    if (alternativeResource.isPresent()) {
      // A standby container can be started on the anyhost-alternative-resource rightaway provided it passes all the
      // standby constraints
      log.info("Handling expired request, standby container {} can be started on alternative resource {}", containerID,
          alternativeResource.get());
      checkStandbyConstraintsAndRunStreamProcessor(request, ResourceRequestState.ANY_HOST, alternativeResource.get(),
          containerAllocator, resourceRequestState);
    } else {
      // If there is no alternative-resource for the standby container we make a new anyhost request
      log.info("Handling expired request, requesting anyHost resource for standby container {}", containerID);
      resourceRequestState.cancelResourceRequest(request);
      containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
    }
  }

  // Handle an expired resource request that was made for placing an active container
  private void handleExpiredRequestForActiveContainer(String containerID, SamzaResourceRequest request,
      ContainerAllocator containerAllocator, ResourceRequestState resourceRequestState) {

    log.info("Handling expired request for active container {}", containerID);
    Optional<FailoverMetadata> failoverMetadata = getFailoverMetadata(request);
    resourceRequestState.cancelResourceRequest(request);

    // we use the activeContainer's resourceID to initiate the failover and index failover-state
    // if there is no prior failure for this active-Container, we use "unknown"
    String lastKnownResourceID =
        failoverMetadata.isPresent() ? failoverMetadata.get().activeContainerResourceID : "unknown-" + containerID;
    log.info("Handling expired request for active container {}, lastKnownResourceID is {}", containerID, lastKnownResourceID);
    initiateStandbyAwareAllocation(containerID, lastKnownResourceID, containerAllocator);
  }

  /**
   * Check if a activeContainerResource has failover-metadata associated with it
   */
  private Optional<FailoverMetadata> getFailoverMetadata(String activeContainerResourceID) {
    return this.failovers.containsKey(activeContainerResourceID) ? Optional.of(
        this.failovers.get(activeContainerResourceID)) : Optional.empty();
  }

  /**
   * Check if a SamzaResourceRequest was issued for a failover.
   */
  private Optional<FailoverMetadata> getFailoverMetadata(SamzaResourceRequest resourceRequest) {
    for (FailoverMetadata failoverMetadata : this.failovers.values()) {
      if (failoverMetadata.containsResourceRequest(resourceRequest)) {
        return Optional.of(failoverMetadata);
      }
    }
    return Optional.empty();
  }

  @Override
  public String toString() {
    return this.failovers.toString();
  }

  /**
   * Encapsulates metadata concerning the failover of an active container.
   */
  public class FailoverMetadata {
    public final String activeContainerID;
    public final String activeContainerResourceID;

    // Map of samza-container-resource ID to host, for each standby container selected for failover of the activeContainer
    private final Map<String, String> selectedStandbyContainers;

    // Resource requests issued during this failover
    private final Set<SamzaResourceRequest> resourceRequests;

    public FailoverMetadata(String activeContainerID, String activeContainerResourceID) {
      this.activeContainerID = activeContainerID;
      this.activeContainerResourceID = activeContainerResourceID;
      this.selectedStandbyContainers = new HashMap<>();
      resourceRequests = new HashSet<>();
    }

    // Check if this standbyContainerResourceID was used in this failover
    public synchronized boolean isStandbyResourceUsed(String standbyContainerResourceID) {
      return this.selectedStandbyContainers.keySet().contains(standbyContainerResourceID);
    }

    // Get the hostname corresponding to the standby resourceID
    public synchronized String getStandbyContainerHostname(String standbyContainerResourceID) {
      return selectedStandbyContainers.get(standbyContainerResourceID);
    }

    // Add the standbyContainer resource to the list of standbyContainers used in this failover
    public synchronized void updateStandbyContainer(String standbyContainerResourceID, String standbyContainerHost) {
      this.selectedStandbyContainers.put(standbyContainerResourceID, standbyContainerHost);
    }

    // Add the samzaResourceRequest to the list of resource requests associated with this failover
    public synchronized void recordResourceRequest(SamzaResourceRequest samzaResourceRequest) {
      this.resourceRequests.add(samzaResourceRequest);
    }

    // Check if this resource request has been issued in this failover
    public synchronized boolean containsResourceRequest(SamzaResourceRequest samzaResourceRequest) {
      return this.resourceRequests.contains(samzaResourceRequest);
    }

    // Check if this standby-host has been used in this failover, that is, if this host matches the host of a selected-standby,
    // or if we have made a resourceRequest for this host
    public boolean isStandbyHostUsed(String standbyHost) {
      return this.selectedStandbyContainers.values().contains(standbyHost)
          || this.resourceRequests.stream().filter(request -> request.getPreferredHost().equals(standbyHost)).count() > 0;
    }

    @Override
    public String toString() {
      return "[activeContainerID: " + this.activeContainerID + " activeContainerResourceID: "
          + this.activeContainerResourceID + " selectedStandbyContainers:" + selectedStandbyContainers
          + " resourceRequests: " + resourceRequests + "]";
    }
  }
}
