| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.samza.clustermanager; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import java.time.Instant; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.PriorityQueue; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import org.apache.commons.lang3.StringUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * {@link ResourceRequestState} maintains the state variables for all the resource requests and the allocated resources returned |
| * by the cluster manager. |
| * |
| * This class is thread-safe, and can safely support concurrent accesses without any form of external synchronization. Currently, |
| * this state is shared across both the Allocator Thread, and the Callback handler thread. |
| * |
| */ |
| public class ResourceRequestState { |
| private static final Logger log = LoggerFactory.getLogger(ResourceRequestState.class); |
| public static final String ANY_HOST = "ANY_HOST"; |
| |
| /** |
| * Maintain a map of hostname to a list of resources allocated on this host |
| */ |
| private final Map<String, List<SamzaResource>> allocatedResources = new HashMap<>(); |
| |
| /** |
| * Represents the queue of resource requests made by the {@link ContainerProcessManager} |
| */ |
| private final PriorityQueue<SamzaResourceRequest> requestsQueue = new PriorityQueue<>(); |
| |
| /** |
| * Represents the queue of delayed resource requests made by the {@link ContainerProcessManager} |
| * The difference between requestsQueue and delayedRequestsQueue is that, any request present the requestsQueue has |
| * been sent out to the cluster resource manager, while requests in the delayedRequestsQueue will be sent to the |
| * cluster resource manager only when their delay reaches 0. |
| */ |
| private final DelayedRequestQueue delayedRequestsQueue = new DelayedRequestQueue(); |
| |
| /** |
| * Maintain a map of hostname to the number of requests made for resources on this host |
| * This state variable is used to look-up whether an allocated resource on a host was ever requested in the past. |
| * This map is not updated when host-affinity is not enabled |
| */ |
| private final Map<String, AtomicInteger> hostRequestCounts = new HashMap<>(); |
| |
| /** |
| * Indicates whether host-affinity is enabled or not |
| */ |
| private final boolean hostAffinityEnabled; |
| |
| private final ClusterResourceManager manager; |
| |
| private final Object lock = new Object(); |
| |
| public ResourceRequestState(boolean hostAffinityEnabled, ClusterResourceManager manager) { |
| this.hostAffinityEnabled = hostAffinityEnabled; |
| this.manager = manager; |
| } |
| |
| /** |
| * Sends a {@link SamzaResourceRequest} to the {@link ClusterResourceManager} and queues the {@link SamzaResourceRequest} |
| * to be matched with the {@link SamzaResource} when it is provisioned. |
| * |
| * @param request {@link SamzaResourceRequest} to be sent and queued. |
| */ |
| public void addResourceRequest(SamzaResourceRequest request) { |
| synchronized (lock) { |
| if (request.getRequestTimestamp().isAfter(Instant.now())) { |
| delayedRequestsQueue.add(request); |
| return; |
| } |
| |
| // Send request immediately if the request timestamp is not in the future. |
| sendResourceRequest(request); |
| } |
| } |
| |
| /** |
| * Cancels a {@link SamzaResourceRequest} previously submitted to the {@link ClusterResourceManager} |
| * |
| * @param request {@link SamzaResourceRequest} to cancel |
| */ |
| public void cancelResourceRequest(SamzaResourceRequest request) { |
| log.info("Canceling resource request for Processor ID: {} on host: {}", request.getProcessorId(), request.getPreferredHost()); |
| synchronized (lock) { |
| delayedRequestsQueue.remove(request); |
| requestsQueue.remove(request); |
| if (hostAffinityEnabled) { |
| // assignedHost may not always be the preferred host. |
| // Hence, we should safely decrement the counter for the preferredHost |
| hostRequestCounts.get(request.getPreferredHost()).decrementAndGet(); |
| } |
| manager.cancelResourceRequest(request); |
| } |
| } |
| |
| /** |
| * Invoked each time a resource is returned from a {@link ClusterResourceManager}. |
| * @param samzaResource The resource that was returned from the {@link ClusterResourceManager} |
| */ |
| public void addResource(SamzaResource samzaResource) { |
| synchronized (lock) { |
| String containerId = samzaResource.getContainerId(); |
| if (hostAffinityEnabled) { |
| String hostName = samzaResource.getHost(); |
| AtomicInteger requestCount = hostRequestCounts.get(hostName); |
| // Check if this host was requested for any of the resources |
| if (requestCount == null || requestCount.get() == 0) { |
| log.info("Saving Container ID: {} in the buffer for ANY_HOST since its host: {} has not been requested specifically.", |
| containerId, hostName); |
| addToAllocatedResourceList(ANY_HOST, samzaResource); |
| } else { |
| // This host was indeed requested. |
| int requestCountOnThisHost = requestCount.get(); |
| List<SamzaResource> allocatedResourcesOnThisHost = allocatedResources.get(hostName); |
| if (requestCountOnThisHost > 0) { |
| //there are pending requests for resources on this host. |
| if (allocatedResourcesOnThisHost == null || allocatedResourcesOnThisHost.size() < requestCountOnThisHost) { |
| log.info("Saving Container ID: {} in the buffer for host: {}.", containerId, hostName); |
| addToAllocatedResourceList(hostName, samzaResource); |
| } else { |
| /** |
| * The RM may allocate more containers on a given host than requested. In such a case, even though the |
| * requestCount != 0, it will be greater than the total request count for that host. Hence, it should be |
| * assigned to ANY_HOST |
| */ |
| log.info("Saving Container ID: {} in the buffer for ANY_HOST since the number of containers already " + |
| "allocated on its host: {} ({}) is greater than what has been requested: {}.", |
| containerId, hostName, requestCountOnThisHost, containerId); |
| addToAllocatedResourceList(ANY_HOST, samzaResource); |
| } |
| } |
| } |
| } else { |
| log.info("Saving Container ID: {} in the buffer for ANY_HOST since host affinity is not enabled.", containerId); |
| addToAllocatedResourceList(ANY_HOST, samzaResource); |
| } |
| } |
| } |
| |
| // Appends a samzaResource to the list of allocated resources |
| private void addToAllocatedResourceList(String host, SamzaResource samzaResource) { |
| List<SamzaResource> samzaResources = allocatedResources.get(host); |
| if (samzaResources != null) { |
| samzaResources.add(samzaResource); |
| } else { |
| samzaResources = new ArrayList<>(); |
| samzaResources.add(samzaResource); |
| allocatedResources.put(host, samzaResources); |
| } |
| } |
| |
| /** |
| * This method updates the state after a request is fulfilled and a resource starts running on a host |
| * Needs to be synchronized because the state buffers are populated by the AMRMCallbackHandler, whereas it is |
| * drained by the allocator thread |
| * |
| * @param request {@link SamzaResourceRequest} that was fulfilled |
| * @param assignedHost Host to which the samzaResource was assigned |
| * @param samzaResource Allocated samzaResource resource that was used to satisfy this request |
| */ |
| public void updateStateAfterAssignment(SamzaResourceRequest request, String assignedHost, SamzaResource samzaResource) { |
| synchronized (lock) { |
| requestsQueue.remove(request); |
| // A reference for the resource could either be held in the preferred host buffer or in the ANY_HOST buffer. |
| if (allocatedResources.get(assignedHost) != null) { |
| allocatedResources.get(assignedHost).remove(samzaResource); |
| } |
| if (allocatedResources.get(ANY_HOST) != null) { |
| allocatedResources.get(ANY_HOST).remove(samzaResource); |
| } |
| if (hostAffinityEnabled) { |
| // assignedHost may not always be the preferred host. |
| // Hence, we should safely decrement the counter for the preferredHost |
| hostRequestCounts.get(request.getPreferredHost()).decrementAndGet(); |
| } |
| // To avoid getting back excess resources |
| manager.cancelResourceRequest(request); |
| } |
| } |
| |
| /** |
| * Sends the {@link SamzaResourceRequest}s in the delayed requests queue that have expired. |
| * @return number of delayed requests sent. |
| */ |
| public int sendPendingDelayedResourceRequests() { |
| synchronized (lock) { |
| int numMoved = 0; |
| Instant now = Instant.now(); |
| while (!delayedRequestsQueue.isEmpty() && delayedRequestsQueue.peek().getRequestTimestamp().isBefore(now)) { |
| sendResourceRequest(delayedRequestsQueue.poll()); |
| numMoved++; |
| } |
| return numMoved; |
| } |
| } |
| |
| /** |
| * If requestQueue is empty, all extra resources in the buffer should be released and update the entire system's state |
| * Needs to be synchronized because it is modifying shared state buffers |
| * @return the number of resources released. |
| */ |
| public int releaseExtraResources() { |
| synchronized (lock) { |
| int numReleasedResources = 0; |
| if (requestsQueue.isEmpty()) { |
| log.debug("Resource Requests Queue is empty."); |
| if (hostAffinityEnabled) { |
| List<String> allocatedHosts = getAllocatedHosts(); |
| for (String host : allocatedHosts) { |
| log.info("Releasing extra resources on host: {}", host); |
| numReleasedResources += releaseResourcesForHost(host); |
| } |
| } else { |
| numReleasedResources += releaseResourcesForHost(ANY_HOST); |
| } |
| clearState(); |
| } |
| return numReleasedResources; |
| } |
| } |
| |
| /** |
| * Releases a single resource based on containerId. |
| * @param containerId Yarn container ID |
| */ |
| public void releaseResource(String containerId) { |
| if (StringUtils.isEmpty(containerId)) { |
| log.warn("ContainerId is not specified"); |
| return; |
| } |
| |
| synchronized (lock) { |
| allocatedResources.values().forEach(resources -> { |
| if (resources != null) { |
| resources.removeIf(r -> containerId.equals(r.getContainerId())); |
| } |
| }); |
| } |
| } |
| |
| /** |
| * Releases a container that was allocated and assigned but could not be started. |
| * e.g. because of a ConnectException while trying to communicate with the NM. |
| * This method assumes the specified container and associated request have already |
| * been removed from their respective queues. |
| * |
| * @param resource the {@link SamzaResource} to release. |
| */ |
| public void releaseUnstartableContainer(SamzaResource resource, String preferredHost) { |
| synchronized (lock) { |
| String containerId = resource.getContainerId(); |
| log.info("Releasing unstartable Container ID: {} on host: {}", containerId, resource.getHost()); |
| manager.releaseResources(resource); |
| |
| // A reference for the resource could either be held in the preferred host buffer or in the ANY_HOST buffer. |
| if (allocatedResources.get(preferredHost) != null) { |
| allocatedResources.get(preferredHost).remove(resource); |
| log.info("Unstartable Container ID: {} removed from the buffer for host: {}", containerId, preferredHost); |
| } |
| if (allocatedResources.get(ANY_HOST) != null) { |
| allocatedResources.get(ANY_HOST).remove(resource); |
| log.info("Unstartable Container ID: {} removed from the buffer for ANY_HOST", containerId); |
| } |
| } |
| } |
| |
| /** |
| * Sends the request to the {@link ClusterResourceManager} while queuing the request to be matched with the returned |
| * {@link SamzaResource}. Caller must call this in a synchronized block. |
| * @param request to be sent. |
| */ |
| @VisibleForTesting |
| void sendResourceRequest(SamzaResourceRequest request) { |
| requestsQueue.add(request); |
| String preferredHost = request.getPreferredHost(); |
| |
| // if host affinity is enabled, update state. |
| if (hostAffinityEnabled) { |
| //increment # of requests on the host. |
| if (hostRequestCounts.containsKey(preferredHost)) { |
| hostRequestCounts.get(preferredHost).incrementAndGet(); |
| } else { |
| hostRequestCounts.put(preferredHost, new AtomicInteger(1)); |
| } |
| /** |
| * The following is important to correlate allocated resource data with the requestsQueue made before. If |
| * the preferredHost is requested for the first time, the state should reflect that the allocatedResources |
| * list is empty and NOT null. |
| */ |
| |
| if (!allocatedResources.containsKey(preferredHost)) { |
| allocatedResources.put(preferredHost, new ArrayList<>()); |
| } |
| } |
| manager.requestResources(request); |
| } |
| |
| /** |
| * Releases all allocated resources for the specified host. |
| * @param host the host for which the resources should be released. |
| * @return the number of resources released. |
| */ |
| private int releaseResourcesForHost(String host) { |
| int numReleasedResources = 0; |
| List<SamzaResource> samzaResources = allocatedResources.get(host); |
| if (samzaResources != null) { |
| for (SamzaResource resource : samzaResources) { |
| log.info("Releasing Container ID: {} on host: {}", resource.getContainerId(), host); |
| manager.releaseResources(resource); |
| numReleasedResources++; |
| } |
| } |
| return numReleasedResources; |
| } |
| |
| |
| /** |
| * Clears all the state variables |
| * Performed when there are no more unfulfilled requests |
| */ |
| private void clearState() { |
| allocatedResources.clear(); |
| hostRequestCounts.clear(); |
| requestsQueue.clear(); |
| } |
| |
| /** |
| * Returns the list of hosts which has at least 1 allocated Resource in the buffer |
| * @return list of host names |
| */ |
| private List<String> getAllocatedHosts() { |
| List<String> hostKeys = new ArrayList<>(); |
| for (Map.Entry<String, List<SamzaResource>> entry: allocatedResources.entrySet()) { |
| if (entry.getValue().size() > 0) { |
| hostKeys.add(entry.getKey()); |
| } |
| } |
| return hostKeys; |
| } |
| |
| /** |
| * Retrieves, but does not remove, the first allocated resource on the specified host. |
| * |
| * @param host the host for which a resource is needed. |
| * @return a {@link SamzaResource} allocated for the specified host or {@code null} if there isn't one. |
| */ |
| public SamzaResource peekResource(String host) { |
| synchronized (lock) { |
| List<SamzaResource> resourcesOnPreferredHostBuffer = this.allocatedResources.get(host); |
| List<SamzaResource> resourcesOnAnyHostBuffer = this.allocatedResources.get(ANY_HOST); |
| |
| // First search for the preferred host (may be ANY_HOST too) |
| if (resourcesOnPreferredHostBuffer != null && !resourcesOnPreferredHostBuffer.isEmpty()) { |
| SamzaResource resource = resourcesOnPreferredHostBuffer.get(0); |
| log.info("Found Container ID: {} for host: {} in the buffer.", resource.getContainerId(), host); |
| return resource; |
| } else if (resourcesOnAnyHostBuffer != null && !resourcesOnAnyHostBuffer.isEmpty()) { |
| // If no resources for preferred host int he buffer, look for ANY_HOST |
| log.debug("No resources on preferred host buffer. Scanning ANY_HOST buffer"); |
| SamzaResource resource = resourcesOnAnyHostBuffer.stream() |
| .filter(resrc -> resrc.getHost().equals(host)) |
| .findAny().orElse(null); |
| if (resource != null) { |
| log.info("Found Container ID: {} for host: {} in the buffer.", resource.getContainerId(), host); |
| } |
| return resource; |
| } else { |
| log.debug("Could not find any containers on host: {}. Both preferred host and ANY_HOST buffers are empty", host); |
| return null; |
| } |
| } |
| } |
| |
| /** |
| * Retrieves, but does not remove, the next pending request in the queue. |
| * |
| * @return the pending request or {@code null} if there is no pending request. |
| */ |
| public SamzaResourceRequest peekPendingRequest() { |
| synchronized (lock) { |
| return requestsQueue.peek(); |
| } |
| } |
| |
| /** |
| * Returns the number of pending SamzaResource requests in the queue. |
| * @return the number of pending requests |
| */ |
| public int numPendingRequests() { |
| synchronized (lock) { |
| return requestsQueue.size(); |
| } |
| } |
| |
| /** |
| * Returns the number of delayed SamzaResource requests in the queue. |
| * @return the number of delayed requests |
| */ |
| public int numDelayedRequests() { |
| synchronized (lock) { |
| return delayedRequestsQueue.size(); |
| } |
| } |
| |
| /** |
| * Returns the list of resources allocated on a given host. If no resources were ever allocated on |
| * the given host, it returns null. This method makes a defensive shallow copy. A shallow copy is |
| * sufficient because the SamzaResource class does not expose setters. |
| * |
| * @param host hostname |
| * @return list of resources allocated on the given host, or null |
| */ |
| public List<SamzaResource> getResourcesOnAHost(String host) { |
| synchronized (lock) { |
| List<SamzaResource> samzaResourceList = allocatedResources.get(host); |
| if (samzaResourceList == null) |
| return null; |
| |
| return new ArrayList<>(samzaResourceList); |
| } |
| } |
| |
| // Package private, used only in tests. |
| @VisibleForTesting |
| Map<String, AtomicInteger> getHostRequestCounts() { |
| return Collections.unmodifiableMap(hostRequestCounts); |
| } |
| |
| @VisibleForTesting |
| DelayedRequestQueue getDelayedRequestsQueue() { |
| return delayedRequestsQueue; |
| } |
| |
| static class DelayedRequestQueue extends PriorityQueue<SamzaResourceRequest> { |
| DelayedRequestQueue() { |
| super(Comparator.comparingLong(request -> request.getRequestTimestamp().toEpochMilli())); |
| } |
| } |
| } |