blob: b849ea5b5d07c2c4eb388b5c313914cc2c674b31 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.clustermanager;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.job.model.ProcessorLocality;
import org.apache.samza.job.model.JobModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Encapsulates logic and state concerning standby-containers.
*/
public class StandbyContainerManager {
private static final Logger log = LoggerFactory.getLogger(StandbyContainerManager.class);
private final SamzaApplicationState samzaApplicationState;
private final LocalityManager localityManager;
// Map of samza containerIDs to their corresponding active and standby containers, e.g., 0 -> {0-0, 0-1}, 0-0 -> {0, 0-1}
// This is used for checking no two standbys or active-standby-pair are started on the same host
private final Map<String, List<String>> standbyContainerConstraints;
// Map of active containers that are in failover, indexed by the active container's resourceID (at the time of failure)
private final Map<String, FailoverMetadata> failovers;
// Resource-manager, used to stop containers
private ClusterResourceManager clusterResourceManager;
public StandbyContainerManager(SamzaApplicationState samzaApplicationState,
ClusterResourceManager clusterResourceManager, LocalityManager localityManager) {
this.failovers = new ConcurrentHashMap<>();
this.localityManager = localityManager;
this.standbyContainerConstraints = new HashMap<>();
this.samzaApplicationState = samzaApplicationState;
JobModel jobModel = samzaApplicationState.jobModelManager.jobModel();
// populate the standbyContainerConstraints map by iterating over all containers
jobModel.getContainers()
.keySet()
.forEach(containerId -> standbyContainerConstraints.put(containerId,
StandbyTaskUtil.getStandbyContainerConstraints(containerId, jobModel)));
this.clusterResourceManager = clusterResourceManager;
log.info("Populated standbyContainerConstraints map {}", standbyContainerConstraints);
}
/**
* We handle the stopping of a container depending on the case which is decided using the exit-status:
* Case 1. an Active-Container which has stopped for an "unknown" reason, then we start it on the given preferredHost (but we record the resource-request)
* Case 2. Active container has stopped because of node failure, thene we initiate a failover
* Case 3. StandbyContainer has stopped after it was chosen for failover, see {@link StandbyContainerManager#handleStandbyContainerStop}
* Case 4. StandbyContainer has stopped but not because of a failover, see {@link StandbyContainerManager#handleStandbyContainerStop}
*
* @param containerID containerID of the stopped container
* @param resourceID last resourceID of the stopped container
* @param preferredHost the host on which the container was running
* @param exitStatus the exit status of the failed container
* @param containerAllocator the container allocator
*/
public void handleContainerStop(String containerID, String resourceID, String preferredHost, int exitStatus,
ContainerAllocator containerAllocator, Duration preferredHostRetryDelay) {
if (StandbyTaskUtil.isStandbyContainer(containerID)) {
handleStandbyContainerStop(containerID, resourceID, preferredHost, containerAllocator, preferredHostRetryDelay);
} else {
// initiate failover for the active container based on the exitStatus
switch (exitStatus) {
case SamzaResourceStatus.DISK_FAIL:
case SamzaResourceStatus.ABORTED:
case SamzaResourceStatus.PREEMPTED:
initiateStandbyAwareAllocation(containerID, resourceID, containerAllocator);
break;
// in all other cases, request-resource for the failed container, but record the resource-request, so that
// if this request expires, we can do a failover -- select a standby to stop & start the active on standby's host
default:
log.info("Requesting resource for active-container {} on host {}", containerID, preferredHost);
SamzaResourceRequest resourceRequest = containerAllocator.getResourceRequestWithDelay(containerID, preferredHost, preferredHostRetryDelay);
FailoverMetadata failoverMetadata = registerActiveContainerFailure(containerID, resourceID);
failoverMetadata.recordResourceRequest(resourceRequest);
containerAllocator.issueResourceRequest(resourceRequest);
break;
}
}
}
/**
* Handle the failed launch of a container, based on
* Case 1. If it is an active container, then initiate a failover for it.
* Case 2. If it is standby container, request a new resource on AnyHost.
* @param containerID the ID of the container that has failed
*/
public void handleContainerLaunchFail(String containerID, String resourceID,
ContainerAllocator containerAllocator) {
if (StandbyTaskUtil.isStandbyContainer(containerID)) {
log.info("Handling launch fail for standby-container {}, requesting resource on any host {}", containerID);
containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
} else {
initiateStandbyAwareAllocation(containerID, resourceID, containerAllocator);
}
}
/**
* Handle the failed stop for a container, based on
* Case 1. If it is standby container, continue the failover
* Case 2. If it is an active container, then this is in invalid state and throw an exception to alarm/restart.
* @param containerID the ID (e.g., 0, 1, 2) of the container that has failed
* @param resourceID id of the resource used for the failed container
*/
public void handleContainerStopFail(String containerID, String resourceID,
ContainerAllocator containerAllocator) {
if (StandbyTaskUtil.isStandbyContainer(containerID)) {
log.info("Handling stop fail for standby-container {}, continuing the failover (if present)", containerID);
// if this standbyContainerResource was stopped for a failover, we will find a metadata entry
Optional<StandbyContainerManager.FailoverMetadata> failoverMetadata = this.checkIfUsedForFailover(resourceID);
// if we find a metadata entry, we continue with the failover (select another standby or any-host appropriately)
failoverMetadata.ifPresent(
metadata -> initiateStandbyAwareAllocation(metadata.activeContainerID, metadata.activeContainerResourceID,
containerAllocator));
} else {
// If this class receives a callback for stop-fail on an active container, throw an exception
throw new SamzaException("Invalid State. Received stop container fail for container Id: " + containerID);
}
}
/**
* If a standby container has stopped, then there are two possible cases
* Case 1. during a failover, the standby container was stopped for an active's start, then we
* 1. request a resource on the standby's host to place the activeContainer, and
* 2. request anyhost to place this standby
*
* Case 2. independent of a failover, the standby container stopped, in which proceed with its resource-request
* @param standbyContainerID SamzaContainerID of the standby container
* @param preferredHost Preferred host of the standby container
*/
private void handleStandbyContainerStop(String standbyContainerID, String resourceID, String preferredHost,
ContainerAllocator containerAllocator, Duration preferredHostRetryDelay) {
// if this standbyContainerResource was stopped for a failover, we will find a metadata entry
Optional<StandbyContainerManager.FailoverMetadata> failoverMetadata = this.checkIfUsedForFailover(resourceID);
if (failoverMetadata.isPresent()) {
String activeContainerID = failoverMetadata.get().activeContainerID;
String standbyContainerHostname = failoverMetadata.get().getStandbyContainerHostname(resourceID);
log.info("Requesting resource for active container {} on host {}, and backup container {} on any host",
activeContainerID, standbyContainerHostname, standbyContainerID);
// request standbycontainer's host for active-container
SamzaResourceRequest resourceRequestForActive =
containerAllocator.getResourceRequestWithDelay(activeContainerID, standbyContainerHostname, preferredHostRetryDelay);
// record the resource request, before issuing it to avoid race with allocation-thread
failoverMetadata.get().recordResourceRequest(resourceRequestForActive);
containerAllocator.issueResourceRequest(resourceRequestForActive);
// request any-host for standby container
containerAllocator.requestResource(standbyContainerID, ResourceRequestState.ANY_HOST);
} else {
log.info("Issuing request for standby container {} on host {}, since this is not for a failover",
standbyContainerID, preferredHost);
containerAllocator.requestResourceWithDelay(standbyContainerID, preferredHost, preferredHostRetryDelay);
}
}
/** Method to handle standby-aware allocation for an active container.
* We try to find a standby host for the active container, and issue a stop on any standby-containers running on it,
* request resource to place the active on the standby's host, and one to place the standby elsewhere.
*
* @param activeContainerID the samzaContainerID of the active-container
* @param resourceID the samza-resource-ID of the container when it failed (used to index failover-state)
*/
private void initiateStandbyAwareAllocation(String activeContainerID, String resourceID,
ContainerAllocator containerAllocator) {
String standbyHost = this.selectStandbyHost(activeContainerID, resourceID);
// if the standbyHost returned is anyhost, we proceed with the request directly
if (standbyHost.equals(ResourceRequestState.ANY_HOST)) {
log.info("No standby container found for active container {}, making a resource-request for placing {} on {}, active's resourceID: {}",
activeContainerID, activeContainerID, ResourceRequestState.ANY_HOST, resourceID);
samzaApplicationState.failoversToAnyHost.incrementAndGet();
containerAllocator.requestResource(activeContainerID, ResourceRequestState.ANY_HOST);
} else {
// Check if there is a running standby-container on that host that needs to be stopped
List<String> standbySamzaContainerIds = this.standbyContainerConstraints.get(activeContainerID);
Map<String, SamzaResource> runningStandbyContainersOnHost = new HashMap<>();
this.samzaApplicationState.runningProcessors.forEach((samzaContainerId, samzaResource) -> {
if (standbySamzaContainerIds.contains(samzaContainerId) && samzaResource.getHost().equals(standbyHost)) {
runningStandbyContainersOnHost.put(samzaContainerId, samzaResource);
}
});
if (runningStandbyContainersOnHost.isEmpty()) {
// if there are no running standby-containers on the standbyHost, we proceed to directly make a resource request
log.info("No running standby container to stop on host {}, making a resource-request for placing {} on {}, active's resourceID: {}",
standbyHost, activeContainerID, standbyHost, resourceID);
FailoverMetadata failoverMetadata = this.registerActiveContainerFailure(activeContainerID, resourceID);
// record the resource request, before issuing it to avoid race with allocation-thread
SamzaResourceRequest resourceRequestForActive =
containerAllocator.getResourceRequest(activeContainerID, standbyHost);
failoverMetadata.recordResourceRequest(resourceRequestForActive);
containerAllocator.issueResourceRequest(resourceRequestForActive);
samzaApplicationState.failoversToStandby.incrementAndGet();
} else {
// if there is a running standby-container on the standbyHost, we issue a stop (the stopComplete callback completes the remainder of the flow)
FailoverMetadata failoverMetadata = this.registerActiveContainerFailure(activeContainerID, resourceID);
runningStandbyContainersOnHost.forEach((standbyContainerID, standbyResource) -> {
log.info("Initiating failover and stopping standby container, found standbyContainer {} = resource {}, "
+ "for active container {}", runningStandbyContainersOnHost.keySet(),
runningStandbyContainersOnHost.values(), activeContainerID);
failoverMetadata.updateStandbyContainer(standbyResource.getContainerId(), standbyResource.getHost());
samzaApplicationState.failoversToStandby.incrementAndGet();
this.clusterResourceManager.stopStreamProcessor(standbyResource);
});
// if multiple standbys are on the same host, we are in an invalid state, so we fail the deploy and retry
if (runningStandbyContainersOnHost.size() > 1) {
throw new SamzaException(
"Invalid State. Multiple standby containers found running on one host:" + runningStandbyContainersOnHost);
}
}
}
}
/**
* Method to select a standby host for a given active container.
* 1. We first try to select a host which has a running standby-container, that we haven't already selected for failover.
* 2. If we dont any such host, we iterate over last-known standbyHosts, if we haven't already selected it for failover.
* 3. If still dont find a host, we fall back to AnyHost.
*
* See https://issues.apache.org/jira/browse/SAMZA-2140
*
* @param activeContainerID Samza containerID of the active container
* @param activeContainerResourceID ResourceID of the active container at the time of its last failure
* @return standby host for the active container (if found), Any-host otherwise.
*/
private String selectStandbyHost(String activeContainerID, String activeContainerResourceID) {
log.info("Standby containers {} for active container {} (resourceID {})", this.standbyContainerConstraints.get(activeContainerID), activeContainerID, activeContainerResourceID);
// obtain any existing failover metadata
Optional<FailoverMetadata> failoverMetadata = getFailoverMetadata(activeContainerResourceID);
// Iterate over the list of running standby containers, to find a standby resource that we have not already
// used for a failover for this active resoruce
for (String standbyContainerID : this.standbyContainerConstraints.get(activeContainerID)) {
if (samzaApplicationState.runningProcessors.containsKey(standbyContainerID)) {
SamzaResource standbyContainerResource = samzaApplicationState.runningProcessors.get(standbyContainerID);
// use this standby if there was no previous failover for which this standbyResource was used
if (!(failoverMetadata.isPresent() && failoverMetadata.get()
.isStandbyResourceUsed(standbyContainerResource.getContainerId()))) {
log.info("Returning standby container {} in running state on host {} for active container {}",
standbyContainerID, standbyContainerResource.getHost(), activeContainerID);
return standbyContainerResource.getHost();
}
}
}
log.info("Did not find any running standby container for active container {}", activeContainerID);
// We iterate over the list of last-known standbyHosts to check if anyone of them has not already been tried
for (String standbyContainerID : this.standbyContainerConstraints.get(activeContainerID)) {
String standbyHost =
Optional.ofNullable(localityManager.readLocality().getProcessorLocality(standbyContainerID))
.map(ProcessorLocality::host)
.orElse(null);
if (StringUtils.isNotBlank(standbyHost)) {
log.info("No last known standbyHost for container {}", standbyContainerID);
} else if (failoverMetadata.isPresent() && failoverMetadata.get().isStandbyHostUsed(standbyHost)) {
log.info("Not using standby host {} for active container {} because it had already been selected", standbyHost,
activeContainerID);
} else {
// standbyHost is valid and has not already been selected
log.info("Returning standby host {} for active container {}", standbyHost, activeContainerID);
return standbyHost;
}
}
log.info("Did not find any standby host for active container {}, returning any-host", activeContainerID);
return ResourceRequestState.ANY_HOST;
}
/**
* Register the failure of an active container (identified by its resource ID).
*/
private FailoverMetadata registerActiveContainerFailure(String activeContainerID, String activeContainerResourceID) {
// this active container's resource ID is already registered, in which case update the metadata
FailoverMetadata failoverMetadata;
if (failovers.containsKey(activeContainerResourceID)) {
failoverMetadata = failovers.get(activeContainerResourceID);
} else {
failoverMetadata = new FailoverMetadata(activeContainerID, activeContainerResourceID);
}
this.failovers.put(activeContainerResourceID, failoverMetadata);
return failoverMetadata;
}
/**
* Check if this standbyContainerResource is present in the failoverState for an active container.
* This is used to determine if we requested a stop a container.
*/
private Optional<FailoverMetadata> checkIfUsedForFailover(String standbyContainerResourceId) {
if (standbyContainerResourceId == null) {
return Optional.empty();
}
for (FailoverMetadata failoverMetadata : failovers.values()) {
if (failoverMetadata.isStandbyResourceUsed(standbyContainerResourceId)) {
log.info("Standby container with resource id {} was selected for failover of active container {}",
standbyContainerResourceId, failoverMetadata.activeContainerID);
return Optional.of(failoverMetadata);
}
}
return Optional.empty();
}
/**
* Check if matching this SamzaResourceRequest to the given resource, meets all standby-container container constraints.
*
* @param containerIdToStart logical id of the container to start
* @param host potential host to start the container on
* @return
*/
boolean checkStandbyConstraints(String containerIdToStart, String host) {
List<String> containerIDsForStandbyConstraints = this.standbyContainerConstraints.get(containerIdToStart);
// Check if any of these conflicting containers are running/launching on host
for (String containerID : containerIDsForStandbyConstraints) {
SamzaResource resource = samzaApplicationState.pendingProcessors.get(containerID);
// return false if a conflicting container is pending for launch on the host
if (resource != null && resource.getHost().equals(host)) {
log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host",
containerIdToStart, host, containerID);
return false;
}
// return false if a conflicting container is running on the host
resource = samzaApplicationState.runningProcessors.get(containerID);
if (resource != null && resource.getHost().equals(host)) {
log.info("Container {} cannot be started on host {} because container {} is already running on this host",
containerIdToStart, host, containerID);
return false;
}
}
return true;
}
/**
* Attempt to the run a container on the given candidate resource, if doing so meets the standby container constraints.
* @param request The Samza container request
* @param preferredHost the preferred host associated with the container
* @param samzaResource the resource candidate
*/
public void checkStandbyConstraintsAndRunStreamProcessor(SamzaResourceRequest request, String preferredHost,
SamzaResource samzaResource, ContainerAllocator containerAllocator,
ResourceRequestState resourceRequestState) {
String containerID = request.getProcessorId();
if (checkStandbyConstraints(containerID, samzaResource.getHost())) {
// This resource can be used to launch this container
log.info("Running container {} on {} meets standby constraints, preferredHost = {}", containerID,
samzaResource.getHost(), preferredHost);
containerAllocator.runStreamProcessor(request, preferredHost);
} else if (StandbyTaskUtil.isStandbyContainer(containerID)) {
// This resource cannot be used to launch this standby container, so we make a new anyhost request
log.info(
"Running standby container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource, and making a new ANY_HOST request",
containerID, samzaResource.getHost());
releaseUnstartableContainer(request, samzaResource, preferredHost, resourceRequestState);
containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
samzaApplicationState.failedStandbyAllocations.incrementAndGet();
} else {
// This resource cannot be used to launch this active container container, so we initiate a failover
log.warn(
"Running active container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource",
containerID, samzaResource.getHost());
releaseUnstartableContainer(request, samzaResource, preferredHost, resourceRequestState);
Optional<FailoverMetadata> failoverMetadata = getFailoverMetadata(request);
String lastKnownResourceID =
failoverMetadata.isPresent() ? failoverMetadata.get().activeContainerResourceID : "unknown-" + containerID;
initiateStandbyAwareAllocation(containerID, lastKnownResourceID, containerAllocator);
samzaApplicationState.failedStandbyAllocations.incrementAndGet();
}
}
/**
* Handle an expired resource request
* @param containerID the containerID for which the resource request was made
* @param request the expired resource request
* @param alternativeResource an alternative, already-allocated, resource (if available)
* @param containerAllocator the container allocator (used to issue any required subsequent resource requests)
* @param resourceRequestState used to cancel resource requests if required.
*/
public void handleExpiredResourceRequest(String containerID, SamzaResourceRequest request,
Optional<SamzaResource> alternativeResource, ContainerAllocator containerAllocator,
ResourceRequestState resourceRequestState) {
if (StandbyTaskUtil.isStandbyContainer(containerID)) {
handleExpiredRequestForStandbyContainer(containerID, request, alternativeResource, containerAllocator,
resourceRequestState);
} else {
handleExpiredRequestForActiveContainer(containerID, request, containerAllocator, resourceRequestState);
}
}
/**
* Fetches a list of standby container for an active container
* @param activeContainerId logical id of the container ex: 0,1,2
* @return list of standby containers ex: for active container 0: {0-0, 0-1}
*/
List<String> getStandbyList(String activeContainerId) {
return this.standbyContainerConstraints.get(activeContainerId);
}
/**
* Release un-startable resources immediately and deletes requests corresponsing to it
*/
void releaseUnstartableContainer(SamzaResourceRequest request, SamzaResource resource, String preferredHost,
ResourceRequestState resourceRequestState) {
resourceRequestState.releaseUnstartableContainer(resource, preferredHost);
resourceRequestState.cancelResourceRequest(request);
}
// Handle an expired resource request that was made for placing a standby container
private void handleExpiredRequestForStandbyContainer(String containerID, SamzaResourceRequest request,
Optional<SamzaResource> alternativeResource, ContainerAllocator containerAllocator,
ResourceRequestState resourceRequestState) {
if (alternativeResource.isPresent()) {
// A standby container can be started on the anyhost-alternative-resource rightaway provided it passes all the
// standby constraints
log.info("Handling expired request, standby container {} can be started on alternative resource {}", containerID,
alternativeResource.get());
checkStandbyConstraintsAndRunStreamProcessor(request, ResourceRequestState.ANY_HOST, alternativeResource.get(),
containerAllocator, resourceRequestState);
} else {
// If there is no alternative-resource for the standby container we make a new anyhost request
log.info("Handling expired request, requesting anyHost resource for standby container {}", containerID);
resourceRequestState.cancelResourceRequest(request);
containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST);
}
}
// Handle an expired resource request that was made for placing an active container
private void handleExpiredRequestForActiveContainer(String containerID, SamzaResourceRequest request,
ContainerAllocator containerAllocator, ResourceRequestState resourceRequestState) {
log.info("Handling expired request for active container {}", containerID);
Optional<FailoverMetadata> failoverMetadata = getFailoverMetadata(request);
resourceRequestState.cancelResourceRequest(request);
// we use the activeContainer's resourceID to initiate the failover and index failover-state
// if there is no prior failure for this active-Container, we use "unknown"
String lastKnownResourceID =
failoverMetadata.isPresent() ? failoverMetadata.get().activeContainerResourceID : "unknown-" + containerID;
log.info("Handling expired request for active container {}, lastKnownResourceID is {}", containerID, lastKnownResourceID);
initiateStandbyAwareAllocation(containerID, lastKnownResourceID, containerAllocator);
}
/**
* Check if a activeContainerResource has failover-metadata associated with it
*/
private Optional<FailoverMetadata> getFailoverMetadata(String activeContainerResourceID) {
return this.failovers.containsKey(activeContainerResourceID) ? Optional.of(
this.failovers.get(activeContainerResourceID)) : Optional.empty();
}
/**
* Check if a SamzaResourceRequest was issued for a failover.
*/
private Optional<FailoverMetadata> getFailoverMetadata(SamzaResourceRequest resourceRequest) {
for (FailoverMetadata failoverMetadata : this.failovers.values()) {
if (failoverMetadata.containsResourceRequest(resourceRequest)) {
return Optional.of(failoverMetadata);
}
}
return Optional.empty();
}
@Override
public String toString() {
return this.failovers.toString();
}
/**
* Encapsulates metadata concerning the failover of an active container.
*/
public class FailoverMetadata {
public final String activeContainerID;
public final String activeContainerResourceID;
// Map of samza-container-resource ID to host, for each standby container selected for failover of the activeContainer
private final Map<String, String> selectedStandbyContainers;
// Resource requests issued during this failover
private final Set<SamzaResourceRequest> resourceRequests;
public FailoverMetadata(String activeContainerID, String activeContainerResourceID) {
this.activeContainerID = activeContainerID;
this.activeContainerResourceID = activeContainerResourceID;
this.selectedStandbyContainers = new HashMap<>();
resourceRequests = new HashSet<>();
}
// Check if this standbyContainerResourceID was used in this failover
public synchronized boolean isStandbyResourceUsed(String standbyContainerResourceID) {
return this.selectedStandbyContainers.keySet().contains(standbyContainerResourceID);
}
// Get the hostname corresponding to the standby resourceID
public synchronized String getStandbyContainerHostname(String standbyContainerResourceID) {
return selectedStandbyContainers.get(standbyContainerResourceID);
}
// Add the standbyContainer resource to the list of standbyContainers used in this failover
public synchronized void updateStandbyContainer(String standbyContainerResourceID, String standbyContainerHost) {
this.selectedStandbyContainers.put(standbyContainerResourceID, standbyContainerHost);
}
// Add the samzaResourceRequest to the list of resource requests associated with this failover
public synchronized void recordResourceRequest(SamzaResourceRequest samzaResourceRequest) {
this.resourceRequests.add(samzaResourceRequest);
}
// Check if this resource request has been issued in this failover
public synchronized boolean containsResourceRequest(SamzaResourceRequest samzaResourceRequest) {
return this.resourceRequests.contains(samzaResourceRequest);
}
// Check if this standby-host has been used in this failover, that is, if this host matches the host of a selected-standby,
// or if we have made a resourceRequest for this host
public boolean isStandbyHostUsed(String standbyHost) {
return this.selectedStandbyContainers.values().contains(standbyHost)
|| this.resourceRequests.stream().filter(request -> request.getPreferredHost().equals(standbyHost)).count() > 0;
}
@Override
public String toString() {
return "[activeContainerID: " + this.activeContainerID + " activeContainerResourceID: "
+ this.activeContainerResourceID + " selectedStandbyContainers:" + selectedStandbyContainers
+ " resourceRequests: " + resourceRequests + "]";
}
}
}