| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; |
| |
| @Private |
| @VisibleForTesting |
| public class WorkflowPriorityMappingsManager { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(WorkflowPriorityMappingsManager.class); |
| |
| private static final String WORKFLOW_PART_SEPARATOR = ":"; |
| |
| private static final String WORKFLOW_SEPARATOR = ","; |
| |
| private CapacityScheduler scheduler; |
| |
| private CapacitySchedulerConfiguration conf; |
| |
| private boolean overrideWithPriorityMappings = false; |
| // Map of queue to a map of workflow ID to priority |
| private Map<String, Map<String, WorkflowPriorityMapping>> priorityMappings = |
| new HashMap<String, Map<String, WorkflowPriorityMapping>>(); |
| |
| public static class WorkflowPriorityMapping { |
| String workflowID; |
| String queue; |
| Priority priority; |
| |
| public WorkflowPriorityMapping(String workflowID, String queue, |
| Priority priority) { |
| this.workflowID = workflowID; |
| this.queue = queue; |
| this.priority = priority; |
| } |
| |
| public Priority getPriority() { |
| return this.priority; |
| } |
| |
| @Override |
| public int hashCode() { |
| return super.hashCode(); |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (obj instanceof WorkflowPriorityMapping) { |
| WorkflowPriorityMapping other = (WorkflowPriorityMapping) obj; |
| return (other.workflowID.equals(workflowID) && |
| other.queue.equals(queue) && |
| other.priority.equals(priority)); |
| } else { |
| return false; |
| } |
| } |
| |
| public String toString() { |
| return workflowID + WORKFLOW_PART_SEPARATOR + queue |
| + WORKFLOW_PART_SEPARATOR + priority.getPriority(); |
| } |
| } |
| |
| @VisibleForTesting |
| public void initialize(CapacityScheduler scheduler) throws IOException { |
| this.scheduler = scheduler; |
| this.conf = scheduler.getConfiguration(); |
| boolean overrideWithWorkflowPriorityMappings = |
| conf.getOverrideWithWorkflowPriorityMappings(); |
| LOG.info("Initialized workflow priority mappings, override: " |
| + overrideWithWorkflowPriorityMappings); |
| this.overrideWithPriorityMappings = overrideWithWorkflowPriorityMappings; |
| this.priorityMappings = getWorkflowPriorityMappings(); |
| } |
| |
| /** |
| * Get workflow ID to priority mappings for a queue. |
| * |
| * @return workflowID to priority mappings for a queue |
| */ |
| public Map<String, Map<String, WorkflowPriorityMapping>> |
| getWorkflowPriorityMappings() { |
| Map<String, Map<String, WorkflowPriorityMapping>> mappings = |
| new HashMap<String, Map<String, WorkflowPriorityMapping>>(); |
| |
| Collection<String> workflowMappings = conf.getWorkflowPriorityMappings(); |
| for (String workflowMapping : workflowMappings) { |
| WorkflowPriorityMapping mapping = |
| getWorkflowMappingFromString(workflowMapping); |
| if (mapping != null) { |
| if (!mappings.containsKey(mapping.queue)) { |
| mappings.put(mapping.queue, |
| new HashMap<String, WorkflowPriorityMapping>()); |
| } |
| mappings.get(mapping.queue).put(mapping.workflowID, mapping); |
| } |
| } |
| return mappings; |
| } |
| |
| private WorkflowPriorityMapping getWorkflowMappingFromString( |
| String mappingString) { |
| if (mappingString == null) { |
| return null; |
| } |
| String[] mappingArray = StringUtils |
| .getTrimmedStringCollection(mappingString, WORKFLOW_PART_SEPARATOR) |
| .toArray(new String[] {}); |
| if (mappingArray.length != 3 || mappingArray[0].length() == 0 |
| || mappingArray[1].length() == 0 || mappingArray[2].length() == 0) { |
| throw new IllegalArgumentException( |
| "Illegal workflow priority mapping " + mappingString); |
| } |
| WorkflowPriorityMapping mapping; |
| try { |
| mapping = new WorkflowPriorityMapping(mappingArray[0], mappingArray[1], |
| Priority.newInstance(Integer.parseInt(mappingArray[2]))); |
| } catch (NumberFormatException e) { |
| throw new IllegalArgumentException( |
| "Illegal workflow priority for mapping " + mappingString); |
| } |
| return mapping; |
| } |
| |
| public Priority getMappedPriority(String workflowID, CSQueue queue) { |
| // Recursively fetch the priority mapping for the given workflow tracing |
| // up the queue hierarchy until the first match. |
| if (queue.equals(scheduler.getRootQueue())) { |
| return null; |
| } |
| String queuePath = queue.getQueuePath(); |
| if (priorityMappings.containsKey(queuePath) |
| && priorityMappings.get(queuePath).containsKey(workflowID)) { |
| return priorityMappings.get(queuePath).get(workflowID).priority; |
| } else { |
| queue = queue.getParent(); |
| return getMappedPriority(workflowID, queue); |
| } |
| } |
| |
| public Priority mapWorkflowPriorityForApp(ApplicationId applicationId, |
| CSQueue queue, String user, Priority priority) throws YarnException { |
| if (overrideWithPriorityMappings) { |
| // Set the correct workflow priority |
| RMApp rmApp = scheduler.getRMContext().getRMApps().get(applicationId); |
| if (rmApp != null && rmApp.getApplicationTags() != null |
| && rmApp.getApplicationSubmissionContext() != null) { |
| String workflowTagPrefix = scheduler.getConf().get( |
| YarnConfiguration.YARN_WORKFLOW_ID_TAG_PREFIX, |
| YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX); |
| String workflowID = null; |
| for(String tag : rmApp.getApplicationTags()) { |
| if (tag.trim().startsWith(workflowTagPrefix)) { |
| workflowID = tag.trim().substring(workflowTagPrefix.length()); |
| } |
| } |
| if (workflowID != null && !workflowID.isEmpty() |
| && priorityMappings != null && priorityMappings.size() > 0) { |
| Priority mappedPriority = getMappedPriority(workflowID, queue); |
| if (mappedPriority != null) { |
| LOG.info("Application " + applicationId + " user " + user |
| + " workflow " + workflowID + " queue " + queue.getQueuePath() |
| + " mapping [" + priority + "] to [" + mappedPriority |
| + "] override " + overrideWithPriorityMappings); |
| |
| // If workflow ID exists in workflow mapping, change this |
| // application's priority to mapped value. Else, use queue |
| // default priority. |
| priority = mappedPriority; |
| priority = scheduler.checkAndGetApplicationPriority( |
| priority, UserGroupInformation.createRemoteUser(user), |
| queue.getQueuePath(), applicationId); |
| rmApp.getApplicationSubmissionContext().setPriority(priority); |
| ((RMAppImpl)rmApp).setApplicationPriority(priority); |
| } |
| } |
| } |
| } |
| return priority; |
| } |
| |
| public static String getWorkflowPriorityMappingStr( |
| List<WorkflowPriorityMapping> workflowPriorityMappings) { |
| if (workflowPriorityMappings == null) { |
| return ""; |
| } |
| List<String> workflowPriorityMappingStrs = new ArrayList<>(); |
| for (WorkflowPriorityMapping mapping : workflowPriorityMappings) { |
| workflowPriorityMappingStrs.add(mapping.toString()); |
| } |
| return StringUtils.join(WORKFLOW_SEPARATOR, workflowPriorityMappingStrs); |
| } |
| } |