| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapreduce.v2.app.rm; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.mapreduce.MRJobConfig; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; |
| import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| |
| import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; |
| |
| /** |
| * Keeps the data for RMContainer's reuse. |
| */ |
| public class RMContainerReuseRequestor extends RMContainerRequestor |
| implements EventHandler<ContainerAvailableEvent> { |
| private static final Log LOG = LogFactory |
| .getLog(RMContainerReuseRequestor.class); |
| |
| private Map<Container, String> containersToReuse = |
| new ConcurrentHashMap<Container, String>(); |
| private Map<ContainerId, List<TaskAttemptId>> containerToTaskAttemptsMap = |
| new HashMap<ContainerId, List<TaskAttemptId>>(); |
| private int containerReuseMaxMapTasks; |
| private int containerReuseMaxReduceTasks; |
| private int maxMapTaskContainers; |
| private int maxReduceTaskContainers; |
| private int noOfMapTaskContainersForReuse; |
| private int noOfReduceTaskContainersForReuse; |
| |
| private RMCommunicator rmCommunicator; |
| |
| public RMContainerReuseRequestor( |
| EventHandler<ContainerAvailableEvent> eventHandler, |
| RMCommunicator rmCommunicator) { |
| super(rmCommunicator); |
| this.rmCommunicator = rmCommunicator; |
| } |
| |
| @Override |
| protected void serviceInit(Configuration conf) throws Exception { |
| super.serviceInit(conf); |
| containerReuseMaxMapTasks = conf.getInt( |
| MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_MAPTASKS, |
| MRJobConfig.DEFAULT_MR_AM_CONTAINER_REUSE_MAX_MAPTASKS); |
| containerReuseMaxReduceTasks = conf.getInt( |
| MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_REDUCETASKS, |
| MRJobConfig.DEFAULT_MR_AM_CONTAINER_REUSE_MAX_REDUCETASKS); |
| maxMapTaskContainers = conf.getInt( |
| MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_MAPTASKCONTAINERS, |
| MRJobConfig.DEFAULT_MR_AM_CONTAINER_REUSE_MAX_MAPTASKCONTAINERS); |
| maxReduceTaskContainers = conf.getInt( |
| MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_REDUCETASKCONTAINERS, |
| MRJobConfig.DEFAULT_MR_AM_CONTAINER_REUSE_MAX_REDUCETASKCONTAINERS); |
| } |
| |
| @Override |
| protected void serviceStart() throws Exception { |
| super.serviceStart(); |
| } |
| |
| @Override |
| public AllocateResponse makeRemoteRequest() |
| throws YarnException, IOException { |
| AllocateResponse amResponse = super.makeRemoteRequest(); |
| synchronized (containersToReuse) { |
| List<Container> allocatedContainers = amResponse.getAllocatedContainers(); |
| allocatedContainers.addAll(containersToReuse.keySet()); |
| containersToReuse.clear(); |
| } |
| return amResponse; |
| } |
| |
| @Override |
| public void containerFailedOnHost(String hostName) { |
| super.containerFailedOnHost(hostName); |
| boolean blacklisted = super.isNodeBlacklisted(hostName); |
| if (blacklisted) { |
| Set<Container> containersOnHost = new HashSet<Container>(); |
| for (Entry<Container, String> elem : containersToReuse.entrySet()) { |
| if (elem.getValue().equals(hostName)) { |
| containersOnHost.add(elem.getKey()); |
| } |
| } |
| for (Container container : containersOnHost) { |
| containersToReuse.remove(container); |
| } |
| } |
| } |
| |
| @Override |
| public void handle(ContainerAvailableEvent event) { |
| Container container = event.getContainer(); |
| ContainerId containerId = container.getId(); |
| String resourceName = container.getNodeId().getHost(); |
| boolean canReuse = false; |
| Priority priority = container.getPriority(); |
| if (RMContainerAllocator.PRIORITY_MAP.equals(priority) |
| || RMContainerAllocator.PRIORITY_REDUCE.equals(priority)) { |
| List<TaskAttemptId> containerTaskAttempts = null; |
| containerTaskAttempts = containerToTaskAttemptsMap.get(containerId); |
| if (containerTaskAttempts == null) { |
| containerTaskAttempts = new ArrayList<TaskAttemptId>(); |
| containerToTaskAttemptsMap.put(containerId, containerTaskAttempts); |
| } |
| if (checkMapContainerReuseConstraints(priority, containerTaskAttempts) |
| || checkReduceContainerReuseConstraints(priority, |
| containerTaskAttempts)) { |
| Map<String, Map<Resource, ResourceRequest>> resourceRequests = |
| remoteRequestsTable.get(priority); |
| // If there are any eligible requests |
| if (resourceRequests != null && !resourceRequests.isEmpty()) { |
| canReuse = true; |
| containerTaskAttempts.add(event.getTaskAttemptId()); |
| } |
| } |
| ((RMContainerAllocator) rmCommunicator) |
| .resetContainerForReuse(container.getId()); |
| if (canReuse) { |
| containersToReuse.put(container, resourceName); |
| incrementRunningReuseContainers(priority); |
| LOG.info("Adding the " + containerId + " for reuse."); |
| } else { |
| LOG.info("Releasing the container : " + containerId |
| + " since it is not eligible for reuse or no pending requests."); |
| containerComplete(container); |
| pendingRelease.add(containerId); |
| release(containerId); |
| } |
| } |
| } |
| |
| private boolean checkMapContainerReuseConstraints(Priority priority, |
| List<TaskAttemptId> containerTaskAttempts) { |
| return RMContainerAllocator.PRIORITY_MAP.equals(priority) |
| // Check for how many tasks can map task container run maximum |
| && ((containerTaskAttempts.size() < containerReuseMaxMapTasks |
| || containerReuseMaxMapTasks == -1) |
| // Check for no of map task containers running |
| && (noOfMapTaskContainersForReuse < maxMapTaskContainers |
| || maxMapTaskContainers == -1)); |
| } |
| |
| private boolean checkReduceContainerReuseConstraints(Priority priority, |
| List<TaskAttemptId> containerTaskAttempts) { |
| return RMContainerAllocator.PRIORITY_REDUCE.equals(priority) |
| // Check for how many tasks can reduce task container run maximum |
| && ((containerTaskAttempts.size() < containerReuseMaxReduceTasks |
| || containerReuseMaxReduceTasks == -1) |
| // Check for no of reduce task containers running |
| && (noOfReduceTaskContainersForReuse < maxReduceTaskContainers |
| || maxReduceTaskContainers == -1)); |
| } |
| |
| private void containerComplete(Container container) { |
| if (!containerToTaskAttemptsMap.containsKey(container.getId())) { |
| return; |
| } |
| containerToTaskAttemptsMap.remove(container.getId()); |
| if (RMContainerAllocator.PRIORITY_MAP.equals(container.getPriority())) { |
| noOfMapTaskContainersForReuse--; |
| } else if (RMContainerAllocator.PRIORITY_REDUCE |
| .equals(container.getPriority())) { |
| noOfReduceTaskContainersForReuse--; |
| } |
| } |
| |
| private void incrementRunningReuseContainers(Priority priority) { |
| if (RMContainerAllocator.PRIORITY_MAP.equals(priority)) { |
| noOfMapTaskContainersForReuse++; |
| } else if (RMContainerAllocator.PRIORITY_REDUCE.equals(priority)) { |
| noOfReduceTaskContainersForReuse++; |
| } |
| } |
| |
| @Private |
| @VisibleForTesting |
| Map<Container, String> getContainersToReuse() { |
| return containersToReuse; |
| } |
| |
| /** |
| * Container Available EventType. |
| */ |
| public static enum EventType { |
| CONTAINER_AVAILABLE |
| } |
| } |