blob: 24130fd27d73a12f47ce6334fae252dcad8e2e23 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.samza.clustermanager;
import java.time.Duration;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadata;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.container.placement.ContainerPlacementMessage;
import org.apache.samza.container.placement.ContainerPlacementRequestMessage;
import org.apache.samza.container.placement.ContainerPlacementResponseMessage;
import org.apache.samza.job.model.ProcessorLocality;
import org.apache.samza.util.BoundedLinkedHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* ContainerManager is a centralized entity that manages container actions like start, stop for both active and standby containers
* ContainerManager acts as a brain for validating and issuing any actions on containers in the Job Coordinator.
* The requests to allocate resources resources made by {@link ContainerAllocator} can either expire or succeed.
* When the requests succeeds the ContainerManager validates those requests before starting the container
* When the requests expires the ContainerManager decides the next set of actions for the pending request.
* Callbacks issued from {@link ClusterResourceManager} aka {@link ContainerProcessManager} are intercepted
* by ContainerManager to handle container failure and completions for both active and standby containers
* ContainerManager encapsulates logic and state related to container placement actions like move, restarts for active container
* if issued externally.
public class ContainerManager {
private static final Logger LOG = LoggerFactory.getLogger(ContainerManager.class);
private static final String ANY_HOST = ResourceRequestState.ANY_HOST;
private static final String LAST_SEEN = "LAST_SEEN";
private static final int UUID_CACHE_SIZE = 20000;
* Container placement metadata store to write responses to control actions
private final ContainerPlacementMetadataStore containerPlacementMetadataStore;
* Resource-manager, used to stop containers
private final ClusterResourceManager clusterResourceManager;
private final SamzaApplicationState samzaApplicationState;
private final boolean hostAffinityEnabled;
* Map maintaining active container placement action meta data indexed by container's processorId eg 0, 1, 2
* Key is chosen to be processorId since at a time only one placement action can be in progress on a container.
private final ConcurrentHashMap<String, ContainerPlacementMetadata> actions;
* In-memory cache of placement requests UUIDs de-queued from the metadata store. Used to de-dup requests with the same
* request UUID. Sized using max tolerable memory footprint and max likely duplicate-spacing.
private final BoundedLinkedHashSet<UUID> placementRequestsCache;
private final Optional<StandbyContainerManager> standbyContainerManager;
private final LocalityManager localityManager;
public ContainerManager(ContainerPlacementMetadataStore containerPlacementMetadataStore,
SamzaApplicationState samzaApplicationState, ClusterResourceManager clusterResourceManager,
boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager localityManager) {
Preconditions.checkNotNull(localityManager, "Locality manager cannot be null");
this.samzaApplicationState = samzaApplicationState;
this.clusterResourceManager = clusterResourceManager;
this.actions = new ConcurrentHashMap<>();
this.placementRequestsCache = new BoundedLinkedHashSet<UUID>(UUID_CACHE_SIZE);
this.hostAffinityEnabled = hostAffinityEnabled;
this.containerPlacementMetadataStore = containerPlacementMetadataStore;
this.localityManager = localityManager;
// Enable standby container manager if required
if (standByEnabled) {
this.standbyContainerManager =
Optional.of(new StandbyContainerManager(samzaApplicationState, clusterResourceManager, localityManager));
} else {
this.standbyContainerManager = Optional.empty();
* Handles the container start action for both active & standby containers. This method is invoked by the allocator thread
* Case 1. If the container launch request is due to an existing container placement action, issue a stop on active
* container & wait for the active container to be stopped before issuing a start.
* Case 2. If StandbyContainer is present refer to {@code StandbyContainerManager#checkStandbyConstraintsAndRunStreamProcessor}
* Case 3. Otherwise just invoke a container start on the allocated resource for the pending request
* TODO: SAMZA-2399: Investigate & configure a timeout for container stop if needed
* @param request pending request for the preferred host
* @param preferredHost preferred host to start the container
* @param allocatedResource resource allocated from {@link ClusterResourceManager}
* @param resourceRequestState state of request in {@link ContainerAllocator}
* @param allocator to request resources from @{@link ClusterResourceManager}
* @return true if the container launch is complete, false if the container launch is in progress. A container launch
* might be in progress when it is waiting for the previous container incarnation to stop in case of container
* placement actions
boolean handleContainerLaunch(SamzaResourceRequest request, String preferredHost, SamzaResource allocatedResource,
ResourceRequestState resourceRequestState, ContainerAllocator allocator) {
if (hasActiveContainerPlacementAction(request.getProcessorId())) {
String processorId = request.getProcessorId();
ContainerPlacementMetadata actionMetaData = getPlacementActionMetadata(processorId).get();
ContainerPlacementMetadata.ContainerStatus actionStatus = actionMetaData.getContainerStatus();
if (samzaApplicationState.runningProcessors.containsKey(processorId) && actionStatus == ContainerPlacementMetadata.ContainerStatus.RUNNING) {
LOG.debug("Requesting running container to shutdown due to existing ContainerPlacement action {}", actionMetaData);
updateContainerPlacementActionStatus(actionMetaData, ContainerPlacementMessage.StatusCode.IN_PROGRESS,
"Active container stop in progress");
return false;
} else if (actionStatus == ContainerPlacementMetadata.ContainerStatus.STOP_IN_PROGRESS) {"Waiting for running container to shutdown due to existing ContainerPlacement action {}", actionMetaData);
return false;
} else if (actionStatus == ContainerPlacementMetadata.ContainerStatus.STOP_FAILED) {"Shutdown on running container failed for action {}", actionMetaData);
String.format("failed to stop container on current host %s", actionMetaData.getSourceHost()));
return true;
} else if (actionStatus == ContainerPlacementMetadata.ContainerStatus.STOPPED) {
// If the job has standby containers enabled, always check standby constraints before issuing a start on container
// Note: Always check constraints against allocated resource, since preferred host can be ANY_HOST as well
if (standbyContainerManager.isPresent() && !standbyContainerManager.get().checkStandbyConstraints(request.getProcessorId(), allocatedResource.getHost())) {
"Starting container {} on host {} does not meet standby constraints, falling back to source host placement metadata: {}",
request.getProcessorId(), preferredHost, actionMetaData);
// Release unstartable container
standbyContainerManager.get().releaseUnstartableContainer(request, allocatedResource, preferredHost, resourceRequestState);
// Fallback to source host since the new allocated resource does not meet standby constraints
allocator.requestResource(processorId, actionMetaData.getSourceHost());
String.format("allocated resource %s does not meet standby constraints now, falling back to source host", allocatedResource));
} else {"Status updated for ContainerPlacement action: ", actionMetaData);
allocator.runStreamProcessor(request, preferredHost);
return true;
if (this.standbyContainerManager.isPresent()) {
.checkStandbyConstraintsAndRunStreamProcessor(request, preferredHost, allocatedResource, allocator,
} else {
allocator.runStreamProcessor(request, preferredHost);
return true;
* Handles the action to be taken after the container has been stopped. If stop was issued due to existing control
* action, mark the container as stopped, otherwise
* Case 1. When standby is enabled, refer to {@link StandbyContainerManager#handleContainerStop} to check constraints.
* Case 2. When standby is disabled there are two cases according to host-affinity being enabled
* Case 2.1. When host-affinity is enabled resources are requested on host where container was last seen
* Case 2.2. When host-affinity is disabled resources are requested for ANY_HOST
* @param processorId logical id of the container eg 1,2,3
* @param containerId last known id of the container deployed
* @param preferredHost host on which container was last deployed
* @param exitStatus exit code returned by the container
* @param preferredHostRetryDelay delay to be incurred before requesting resources
* @param containerAllocator allocator for requesting resources
void handleContainerStop(String processorId, String containerId, String preferredHost, int exitStatus,
Duration preferredHostRetryDelay, ContainerAllocator containerAllocator) {
if (hasActiveContainerPlacementAction(processorId)) {
ContainerPlacementMetadata metadata = getPlacementActionMetadata(processorId).get();"Setting the container state with Processor ID: {} to be stopped because of existing ContainerPlacement action: {}",
processorId, metadata);
} else if (standbyContainerManager.isPresent()) {
.handleContainerStop(processorId, containerId, preferredHost, exitStatus, containerAllocator,
} else {
// If StandbyTasks are not enabled, we simply make a request for the preferredHost
containerAllocator.requestResourceWithDelay(processorId, preferredHost, preferredHostRetryDelay);
* Handle the container launch failure for active containers and standby (if enabled).
* Case 1. If this launch was issued due to an existing container placement action update the metadata to report failure and issue
* a request for source host where the container was last seen and mark the container placement failed
* Case 2. When standby is enabled, refer to {@link StandbyContainerManager#handleContainerLaunchFail} to check behavior
* Case 3. When standby is disabled the allocator issues a request for ANY_HOST resources
* @param processorId logical id of the container eg 1,2,3
* @param containerId last known id of the container deployed
* @param preferredHost host on which container is requested to be deployed
* @param containerAllocator allocator for requesting resources
void handleContainerLaunchFail(String processorId, String containerId, String preferredHost,
ContainerAllocator containerAllocator) {
if (processorId != null && hasActiveContainerPlacementAction(processorId)) {
ContainerPlacementMetadata metaData = getPlacementActionMetadata(processorId).get();
// Issue a request to start the container on source host
String sourceHost = hostAffinityEnabled ? metaData.getSourceHost() : ResourceRequestState.ANY_HOST;
String.format("failed to start container on destination host %s, attempting to start on source host %s",
preferredHost, sourceHost));
containerAllocator.requestResource(processorId, sourceHost);
} else if (processorId != null && standbyContainerManager.isPresent()) {
standbyContainerManager.get().handleContainerLaunchFail(processorId, containerId, containerAllocator);
} else if (processorId != null) {"Falling back to ANY_HOST for Processor ID: {} since launch failed for Container ID: {} on host: {}",
processorId, containerId, preferredHost);
containerAllocator.requestResource(processorId, ResourceRequestState.ANY_HOST);
} else {
LOG.warn("Did not find a pending Processor ID for Container ID: {} on host: {}. "
+ "Ignoring invalid/redundant notification.", containerId, preferredHost);
* Handle the container stop failure for active containers and standby (if enabled).
* @param processorId logical id of the container eg 1,2,3
* @param containerId last known id of the container deployed
* @param containerHost host on which container is requested to be deployed
* @param containerAllocator allocator for requesting resources
* TODO: SAMZA-2512 Add integ test for handleContainerStopFail
void handleContainerStopFail(String processorId, String containerId, String containerHost,
ContainerAllocator containerAllocator) {
if (processorId != null && hasActiveContainerPlacementAction(processorId)) {
// Assuming resource acquired on destination host will be relinquished by the containerAllocator,
// We mark the placement action as failed, and return.
ContainerPlacementMetadata metaData = getPlacementActionMetadata(processorId).get();
} else if (processorId != null && standbyContainerManager.isPresent()) {
standbyContainerManager.get().handleContainerStopFail(processorId, containerId, containerAllocator);
} else {
LOG.warn("Did not find a running Processor ID for Container ID: {} on host: {}. "
+ "Ignoring invalid/redundant notification.", containerId, containerHost);
* Handles the state update on successful launch of a container, if this launch is due to a container placement action updates the
* related metadata to report success
* @param processorId logical processor id of container 0,1,2
void handleContainerLaunchSuccess(String processorId, String containerHost) {
if (hasActiveContainerPlacementAction(processorId)) {
ContainerPlacementMetadata metadata = getPlacementActionMetadata(processorId).get();
// Mark the active container running again and dispatch a response
updateContainerPlacementActionStatus(metadata, ContainerPlacementMessage.StatusCode.SUCCEEDED,
"Successfully completed the container placement action started container on host " + containerHost);
* Handles an expired resource request for both active and standby containers.
* Case 1. If this expired request is due to a container placement action mark the request as failed and return
* Case 2: Otherwise for a normal resource request following cases are possible
* Case 2.1 If StandbyContainer is present refer to {@code StandbyContainerManager#handleExpiredResourceRequest}
* Case 2.2: host-affinity is enabled, allocator thread looks for allocated resources on ANY_HOST and issues a
* container start if available, otherwise issue an ANY_HOST request
* Case 2.2: host-affinity is disabled, allocator thread does not handle expired requests, it waits for cluster
* manager to return resources on ANY_HOST
* @param processorId logical id of the container
* @param preferredHost host on which container is requested to be deployed
* @param request pending request for the preferred host
* @param allocator allocator for requesting resources
* @param resourceRequestState state of request in {@link ContainerAllocator}
void handleExpiredRequest(String processorId, String preferredHost,
SamzaResourceRequest request, ContainerAllocator allocator, ResourceRequestState resourceRequestState) {
boolean resourceAvailableOnAnyHost = allocator.hasAllocatedResource(ResourceRequestState.ANY_HOST);
// Case 1. Container placement actions can be taken in either cases of host affinity being set, in both cases
// mark the container placement action failed
if (hasActiveContainerPlacementAction(processorId)) {
"failed the ContainerPlacement action because request for resources to ClusterManager expired");
// Case 2. When host affinity is disabled wait for cluster resource manager return resources
// TODO: SAMZA-2330: Handle expired request for host affinity disabled case by retying request for getting ANY_HOST
if (!hostAffinityEnabled) {
// Case 2. When host affinity is enabled handle the expired requests
if (standbyContainerManager.isPresent()) {
.handleExpiredResourceRequest(processorId, request,
Optional.ofNullable(allocator.peekAllocatedResource(ResourceRequestState.ANY_HOST)), allocator,
} else if (resourceAvailableOnAnyHost) {"Request for Processor ID: {} on host: {} has expired. Running on ANY_HOST", processorId, preferredHost);
allocator.runStreamProcessor(request, ResourceRequestState.ANY_HOST);
} else {"Request for Processor ID: {} on host: {} has expired. Requesting additional resources on ANY_HOST.",
processorId, preferredHost);
allocator.requestResource(processorId, ResourceRequestState.ANY_HOST);
* Handles expired allocated resource by requesting the same resource again and release the expired allocated resource
* @param request pending request for the preferred host
* @param resource resource allocated from {@link ClusterResourceManager} which has expired
* @param preferredHost host on which container is requested to be deployed
* @param resourceRequestState state of request in {@link ContainerAllocator}
* @param allocator allocator for requesting resources
void handleExpiredResource(SamzaResourceRequest request, SamzaResource resource, String preferredHost,
ResourceRequestState resourceRequestState, ContainerAllocator allocator) {"Allocated resource {} has expired for Processor ID: {} request: {}. Re-requesting resource again",
resource, request.getProcessorId(), request);
resourceRequestState.releaseUnstartableContainer(resource, preferredHost);
SamzaResourceRequest newResourceRequest = allocator.getResourceRequest(request.getProcessorId(), request.getPreferredHost());
if (hasActiveContainerPlacementAction(newResourceRequest.getProcessorId())) {
ContainerPlacementMetadata metadata = getPlacementActionMetadata(request.getProcessorId()).get();
* Registers a container placement action to move the running container to destination host, if destination host is same as the
* host on which container is running, container placement action is treated as a restart.
* When host affinity is disabled a move / restart is only allowed on ANY_HOST
* When host affinity is enabled move / restart is allowed on specific or ANY_HOST
* Container placement requests are tied to deploymentId which is currently {@link org.apache.samza.config.ApplicationConfig#APP_RUN_ID}
* On job restarts container placement requests queued for the previous deployment are deleted using this
* All kinds of container placement request except for when destination host is "FORCE_RESTART_LAST_SEEN" work with
* a RESERVE - STOP - START policy, which means resources are accrued first before issuing a container stop, failure to
* do so will leave the running container untouched. Requests with destination host "FORCE_RESTART_LAST_SEEN" works with
* STOP - RESERVE - START policy, which means running container is stopped first then resource request are issued, this case
* is equivalent to doing a kill -9 on a container
* @param requestMessage request containing logical processor id 0,1,2 and host where container is desired to be moved,
* acceptable values of this param are
* - valid hostname
* - "ANY_HOST" in this case the request is sent to resource manager for any host
* - "LAST_SEEN" in this case request is sent to resource manager for last seen host
* - "FORCE_RESTART_LAST_SEEN" in this case request is sent to resource manager for last seen host
* @param containerAllocator to request physical resources
public void registerContainerPlacementAction(ContainerPlacementRequestMessage requestMessage, ContainerAllocator containerAllocator) {
String processorId = requestMessage.getProcessorId();
String destinationHost = requestMessage.getDestinationHost();
// Is the action ready to be de-queued and taken or it needs to wait to be executed in future
if (!deQueueAction(requestMessage)) {
}"ContainerPlacement action is de-queued metadata: {}", requestMessage);
Pair<ContainerPlacementMessage.StatusCode, String> actionStatus = validatePlacementAction(requestMessage);
// Action is de-queued upon so we record it in the cache
// Remove the request message from metastore since this message is already acted upon
// Request is bad just update the response on message & return
if (actionStatus.getKey() == ContainerPlacementMessage.StatusCode.BAD_REQUEST) {"Status updated for ContainerPlacement action request: {} response: {}", requestMessage, actionStatus.getValue());
writeContainerPlacementResponseMessage(requestMessage, actionStatus.getKey(), actionStatus.getValue());
* When destination host is {@code FORCE_RESTART_LAST_SEEN} its treated as eqvivalent to kill -9 operation for the container
* In this scenario container is stopped first and we fallback to normal restart path so the policy here is
* stop - reserve - move
if (destinationHost.equals(FORCE_RESTART_LAST_SEEN)) {"Issuing a force restart for Processor ID: {} for ContainerPlacement action request {}", processorId, requestMessage);
writeContainerPlacementResponseMessage(requestMessage, ContainerPlacementMessage.StatusCode.SUCCEEDED,
"Successfully issued a stop container request falling back to normal restart path");
* When destination host is {@code LAST_SEEN} its treated as a restart request on the host where container is running
* on or has been seen last, but in this policy would be reserve - stop - move, which means reserve resources first
* only if resources are accrued stop the active container and issue a start on it on resource acquired
if (destinationHost.equals(LAST_SEEN)) {
String lastSeenHost = getSourceHostForContainer(requestMessage);"Changing the requested host for placement action to {} because requested host is LAST_SEEN", lastSeenHost);
destinationHost = lastSeenHost;
// TODO: SAMZA-2457: Allow host affinity disabled jobs to move containers to specific host
if (!hostAffinityEnabled) {"Changing the requested host for placement action to {} because host affinity is disabled", ResourceRequestState.ANY_HOST);
destinationHost = ANY_HOST;
// Register metadata
ContainerPlacementMetadata actionMetaData = new ContainerPlacementMetadata(requestMessage, getSourceHostForContainer(requestMessage));
actions.put(processorId, actionMetaData);
// If the job is running in a degraded state then the container is already stopped
if (samzaApplicationState.failedProcessors.containsKey(requestMessage.getProcessorId())) {
SamzaResourceRequest resourceRequest = containerAllocator.getResourceRequest(processorId, destinationHost);
// Record the resource request for monitoring
actions.put(processorId, actionMetaData);
updateContainerPlacementActionStatus(actionMetaData, ContainerPlacementMessage.StatusCode.IN_PROGRESS, "Preferred Resources requested");
* This method is only used for Testing the Container Placement actions to get a hold of {@link ContainerPlacementMetadata}
* for assertions. Not intended to be used in src
ContainerPlacementMetadata registerContainerPlacementActionForTest(ContainerPlacementRequestMessage requestMessage,
ContainerAllocator containerAllocator) {
registerContainerPlacementAction(requestMessage, containerAllocator);
if (hasActiveContainerPlacementAction(requestMessage.getProcessorId())) {
return getPlacementActionMetadata(requestMessage.getProcessorId()).get();
return null;
public Optional<Duration> getActionExpiryTimeout(SamzaResourceRequest resourceRequest) {
for (ContainerPlacementMetadata actionMetadata : actions.values()) {
if (actionMetadata.containsResourceRequest(resourceRequest)
&& actionMetadata.getActionStatus() == ContainerPlacementMessage.StatusCode.IN_PROGRESS) {
return actionMetadata.getRequestActionExpiryTimeout();
return Optional.empty();
private void markContainerPlacementActionFailed(ContainerPlacementMetadata metaData, String failureMessage) {
updateContainerPlacementActionStatus(metaData, ContainerPlacementMessage.StatusCode.FAILED, failureMessage);
* A ContainerPlacementAction is only active if it is either CREATED, ACCEPTED or IN_PROGRESS
private boolean hasActiveContainerPlacementAction(String processorId) {
Optional<ContainerPlacementMetadata> metadata = getPlacementActionMetadata(processorId);
if (metadata.isPresent()) {
switch (metadata.get().getActionStatus()) {
return true;
return false;
return false;
* Check if a activeContainerResource has control-action-metadata associated with it
private Optional<ContainerPlacementMetadata> getPlacementActionMetadata(String processorId) {
return Optional.ofNullable(this.actions.get(processorId));
private void updateContainerPlacementActionStatus(ContainerPlacementMetadata metadata,
ContainerPlacementMessage.StatusCode statusCode, String responseMessage) {
metadata.setActionStatus(statusCode, responseMessage);
writeContainerPlacementResponseMessage(metadata.getRequestMessage(), statusCode, responseMessage);"Status updated for ContainerPlacement action: {}", metadata);
private void writeContainerPlacementResponseMessage(ContainerPlacementRequestMessage requestMessage,
ContainerPlacementMessage.StatusCode statusCode, String responseMessage) {
ContainerPlacementResponseMessage.fromContainerPlacementRequestMessage(requestMessage, statusCode,
responseMessage, System.currentTimeMillis()));
* Gets the hostname on which container is either currently running or was last seen on if it is not running
* TODO SAMZA-2480: Move logic related to onResourcesCompleted from ContainerProcessManager to ContainerManager
private String getSourceHostForContainer(ContainerPlacementRequestMessage requestMessage) {
String sourceHost = null;
String processorId = requestMessage.getProcessorId();
if (samzaApplicationState.runningProcessors.containsKey(processorId)) {
SamzaResource currentResource = samzaApplicationState.runningProcessors.get(processorId);"Processor ID: {} matched a running container with containerId ID: {} is running on host: {} for ContainerPlacement action: {}",
processorId, currentResource.getContainerId(), currentResource.getHost(), requestMessage);
sourceHost = currentResource.getHost();
} else {
sourceHost = Optional.ofNullable(localityManager.readLocality().getProcessorLocality(processorId))
.orElse(null);"Processor ID: {} is not running and was last seen on host: {} for ContainerPlacement action: {}",
processorId, sourceHost, requestMessage);
return sourceHost;
* These are specific scenarios in which a placement action should wait for existing action to complete before it is executed
* 1. If there is an placement request in progress on active container
* 2. If there is an placement request is progress on any of its standby container
* 3. If the container itself is pending a start
* @param requestMessage container placement request message
* @return true if action should be taken right now, false if it needs to wait to be taken in future
private boolean deQueueAction(ContainerPlacementRequestMessage requestMessage) {
// Do not dequeue action wait for the in-flight action to complete
if (checkIfActiveOrStandbyContainerHasActivePlacementAction(requestMessage)) {
return false;
if (samzaApplicationState.failedProcessors.containsKey(requestMessage.getProcessorId())) {"ContainerPlacement request: {} is de-queued, container with Processor ID: {} has exhausted all retries and is in failed state",
requestMessage, requestMessage.getProcessorId());
return true;
// Do not dequeue the action wait for the container to come to a running state
if (!samzaApplicationState.runningProcessors.containsKey(requestMessage.getProcessorId())
|| samzaApplicationState.pendingProcessors.containsKey(requestMessage.getProcessorId())) {"ContainerPlacement request: {} is en-queued because container is pending start", requestMessage);
return false;
return true;
* A valid container placement action needs a valid processor id. Duplicate actions are handled by de-duping on uuid.
* If standby containers are enabled destination host requested must meet standby constraints
* @param requestMessage container placement request message
* @return Pair<ContainerPlacementMessage.StatusCode, String> which is status code & response suggesting if the request is valid
private Pair<ContainerPlacementMessage.StatusCode, String> validatePlacementAction(ContainerPlacementRequestMessage requestMessage) {
String errorMessagePrefix = ContainerPlacementMessage.StatusCode.BAD_REQUEST + " reason: %s";
Boolean invalidAction = false;
String errorMessage = null;
boolean isRunning = samzaApplicationState.runningProcessors.containsKey(requestMessage.getProcessorId());
boolean isPending = samzaApplicationState.pendingProcessors.containsKey(requestMessage.getProcessorId());
boolean isFailed = samzaApplicationState.failedProcessors.containsKey(requestMessage.getProcessorId());
if (!isRunning && !isPending && !isFailed) {
errorMessage = String.format(errorMessagePrefix, "invalid processor id neither in running, pending or failed processors");
invalidAction = true;
} else if (placementRequestsCache.containsKey(requestMessage.getUuid())) {
errorMessage = String.format(errorMessagePrefix, "duplicate UUID of the request, please retry");
invalidAction = true;
} else if (standbyContainerManager.isPresent() && !standbyContainerManager.get()
.checkStandbyConstraints(requestMessage.getProcessorId(), requestMessage.getDestinationHost())) {
errorMessage = String.format(errorMessagePrefix, "destination host does not meet standby constraints");
invalidAction = true;
if (invalidAction) {
return new ImmutablePair<>(ContainerPlacementMessage.StatusCode.BAD_REQUEST, errorMessage);
return new ImmutablePair<>(ContainerPlacementMessage.StatusCode.ACCEPTED, "Request is accepted");
* Checks if there are any active container placement action on container itself or on any of its standby containers
* (if enabled)
private boolean checkIfActiveOrStandbyContainerHasActivePlacementAction(ContainerPlacementRequestMessage request) {
String processorId = request.getProcessorId();
// Container itself has active container placement actions
if (hasActiveContainerPlacementAction(processorId)) {"ContainerPlacement request: {} is en-queued because container has an in-progress placement action", request);
return true;
if (standbyContainerManager.isPresent()) {
// If requested placement action is on a standby container and its active container has a placement request,
// this request shall not be de-queued until in-flight action on active container is complete
if (StandbyTaskUtil.isStandbyContainer(processorId) && hasActiveContainerPlacementAction(
StandbyTaskUtil.getActiveContainerId(processorId))) {"ContainerPlacement request: {} is en-queued because its active container has an in-progress placement action", request);
return true;
// If requested placement action is on a standby container and its active container has a placement request,
// this request shall not be de-queued until in-flight action on active container is complete
if (!StandbyTaskUtil.isStandbyContainer(processorId)) {
for (String standby : standbyContainerManager.get().getStandbyList(processorId)) {
if (hasActiveContainerPlacementAction(standby)) {"ContainerPlacement request: {} is en-queued because one of its standby replica has an in-progress placement action", request);
return true;
return false;