blob: 57ce35099973b7fc3414c450e3246cb9f204289b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.job.yarn;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class maintains the state variables for all the container requests and the allocated containers returned
* by the RM
* Important: Even though we use concurrent data structures, this class is not thread-safe. Thread safety has to be
* handled by the caller.
*/
public class ContainerRequestState {
private static final Logger log = LoggerFactory.getLogger(ContainerRequestState.class);
public static final String ANY_HOST = "ANY_HOST";
/**
* Maintain a map of hostname to a list of containers allocated on this host
*/
private final ConcurrentHashMap<String, List<Container>> allocatedContainers = new ConcurrentHashMap<String, List<Container>>();
/**
* Represents the queue of container requests made by the {@link org.apache.samza.job.yarn.SamzaTaskManager}
*/
private final PriorityBlockingQueue<SamzaContainerRequest> requestsQueue = new PriorityBlockingQueue<SamzaContainerRequest>();
/**
* Maintain a map of hostname to the number of requests made for containers on this host
* This state variable is used to look-up whether an allocated container on a host was ever requested in the past.
* This map is not updated when host-affinity is not enabled
*/
private final ConcurrentHashMap<String, AtomicInteger> requestsToCountMap = new ConcurrentHashMap<String, AtomicInteger>();
/**
* Indicates whether host-affinity is enabled or not
*/
private final boolean hostAffinityEnabled;
private final AMRMClientAsync<AMRMClient.ContainerRequest> amClient;
// TODO: Refactor such that the state class for host-affinity enabled allocator is a subclass of a generic state class
public ContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> amClient,
boolean hostAffinityEnabled) {
this.amClient = amClient;
this.hostAffinityEnabled = hostAffinityEnabled;
}
/**
* This method is called every time {@link org.apache.samza.job.yarn.SamzaTaskManager} requestsQueue for a container
* Adds {@link org.apache.samza.job.yarn.SamzaContainerRequest} to the requestsQueue queue.
* If host-affinity is enabled, it updates the requestsToCountMap as well.
*
* @param request {@link org.apache.samza.job.yarn.SamzaContainerRequest} that was sent to the RM
*/
public synchronized void updateRequestState(SamzaContainerRequest request) {
log.info("Requesting a container for {} at {}", request.getExpectedContainerId(), request.getPreferredHost());
amClient.addContainerRequest(request.getIssuedRequest());
requestsQueue.add(request);
String preferredHost = request.getPreferredHost();
if (hostAffinityEnabled) {
if (requestsToCountMap.containsKey(preferredHost)) {
requestsToCountMap.get(preferredHost).incrementAndGet();
} else {
requestsToCountMap.put(preferredHost, new AtomicInteger(1));
}
/**
* The following is important to correlate allocated container data with the requestsQueue made before. If
* the preferredHost is requested for the first time, the state should reflect that the allocatedContainers
* list is empty and NOT null.
*/
if (!allocatedContainers.containsKey(preferredHost)) {
allocatedContainers.put(preferredHost, new ArrayList<Container>());
}
}
}
/**
* This method is called every time the RM returns an allocated container.
* Adds the allocated container resource to the correct allocatedContainers buffer
* @param container Container resource that was returned by the RM
*/
public synchronized void addContainer(Container container) {
if(hostAffinityEnabled) {
String hostName = container.getNodeHttpAddress().split(":")[0];
AtomicInteger requestCount = requestsToCountMap.get(hostName);
// Check if this host was requested for any of the containers
if (requestCount == null || requestCount.get() == 0) {
log.debug(
"Request count for the allocatedContainer on {} is null or 0. This means that the host was not requested " +
"for running containers.Hence, saving the container {} in the buffer for ANY_HOST",
hostName,
container.getId()
);
addToAllocatedContainerList(ANY_HOST, container);
} else {
int requestCountOnThisHost = requestCount.get();
List<Container> allocatedContainersOnThisHost = allocatedContainers.get(hostName);
if (requestCountOnThisHost > 0) {
if (allocatedContainersOnThisHost == null || allocatedContainersOnThisHost.size() < requestCountOnThisHost) {
log.debug("Saving the container {} in the buffer for {}", container.getId(), hostName);
addToAllocatedContainerList(hostName, container);
}
else {
/**
* The RM may allocate more containers on a given host than requested. In such a case, even though the
* requestCount != 0, it will be greater than the total request count for that host. Hence, it should be
* assigned to ANY_HOST
*/
log.debug(
"The number of containers already allocated on {} is greater than what was " +
"requested, which is {}. Hence, saving the container {} in the buffer for ANY_HOST",
new Object[]{
hostName,
requestCountOnThisHost,
container.getId()
}
);
addToAllocatedContainerList(ANY_HOST, container);
}
}
}
}
else {
log.debug("Saving the container {} in the buffer for ANY_HOST", container.getId());
addToAllocatedContainerList(ANY_HOST, container);
}
}
// Update the allocatedContainers list
private void addToAllocatedContainerList(String host, Container container) {
List<Container> list = allocatedContainers.get(host);
if (list != null) {
list.add(container);
} else {
list = new ArrayList<Container>();
list.add(container);
allocatedContainers.put(host, list);
}
}
/**
* This method updates the state after a request is fulfilled and a container starts running on a host
* Needs to be synchronized because the state buffers are populated by the AMRMCallbackHandler, whereas it is drained by the allocator thread
*
* @param request {@link org.apache.samza.job.yarn.SamzaContainerRequest} that was fulfilled
* @param assignedHost Host to which the container was assigned
* @param container Allocated container resource that was used to satisfy this request
*/
public synchronized void updateStateAfterAssignment(SamzaContainerRequest request, String assignedHost, Container container) {
requestsQueue.remove(request);
allocatedContainers.get(assignedHost).remove(container);
if (hostAffinityEnabled) {
// assignedHost may not always be the preferred host.
// Hence, we should safely decrement the counter for the preferredHost
requestsToCountMap.get(request.getPreferredHost()).decrementAndGet();
}
// To avoid getting back excess containers
amClient.removeContainerRequest(request.getIssuedRequest());
}
/**
* If requestQueue is empty, all extra containers in the buffer should be released and update the entire system's state
* Needs to be synchronized because it is modifying shared state buffers
* @return the number of containers released.
*/
public synchronized int releaseExtraContainers() {
int numReleasedContainers = 0;
if (requestsQueue.isEmpty()) {
log.debug("Container Requests Queue is empty.");
if (hostAffinityEnabled) {
List<String> allocatedHosts = getAllocatedHosts();
for (String host : allocatedHosts) {
numReleasedContainers += releaseContainersForHost(host);
}
} else {
numReleasedContainers += releaseContainersForHost(ANY_HOST);
}
clearState();
}
return numReleasedContainers;
}
/**
* Releases all allocated containers for the specified host.
* @param host the host for which the containers should be released.
* @return the number of containers released.
*/
private int releaseContainersForHost(String host) {
int numReleasedContainers = 0;
List<Container> containers = getContainersOnAHost(host);
if (containers != null) {
for (Container c : containers) {
log.info("Releasing extra container {} allocated on {}", c.getId(), host);
amClient.releaseAssignedContainer(c.getId());
numReleasedContainers++;
}
}
return numReleasedContainers;
}
/**
* Releases a container that was allocated and assigned but could not be started.
* e.g. because of a ConnectException while trying to communicate with the NM.
* This method assumes the specified container and associated request have already
* been removed from their respective queues.
*
* @param container the {@link Container} to release.
*/
public void releaseUnstartableContainer(Container container) {
log.info("Releasing unstartable container {}", container.getId());
amClient.releaseAssignedContainer(container.getId());
}
/**
* Clears all the state variables
* Performed when there are no more unfulfilled requests
* This is not synchronized because it is private.
*/
private void clearState() {
allocatedContainers.clear();
requestsToCountMap.clear();
requestsQueue.clear();
}
/**
* Returns the list of hosts which has at least 1 allocatedContainer in the buffer
* @return list of host names
*/
private List<String> getAllocatedHosts() {
List<String> hostKeys = new ArrayList<String>();
for(Map.Entry<String, List<Container>> entry: allocatedContainers.entrySet()) {
if(entry.getValue().size() > 0) {
hostKeys.add(entry.getKey());
}
}
return hostKeys;
}
/**
* Returns the list of containers allocated on a given host
* If no containers were ever allocated on the given host, it returns null.
* @param host hostname
* @return list of containers allocated on the given host, or null
*/
public List<Container> getContainersOnAHost(String host) {
return allocatedContainers.get(host);
}
public PriorityBlockingQueue<SamzaContainerRequest> getRequestsQueue() {
return requestsQueue;
}
public ConcurrentHashMap<String, AtomicInteger> getRequestsToCountMap() {
return requestsToCountMap;
}
}