blob: fb8d869e56dbdc40dd8e9a0b4d6caf0c7dabbdf0 [file] [log] [blame]
package org.apache.helix.task.assigner;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.task.AssignableInstanceManager;
import org.apache.helix.task.TaskConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ThreadCountBasedTaskAssigner implements TaskAssigner {
private static final Logger logger = LoggerFactory.getLogger(ThreadCountBasedTaskAssigner.class);
private static final int SCHED_QUEUE_INIT_CAPACITY = 200;
private AssignableInstanceManager _assignableInstanceManager;
/**
* Assigns given tasks to given AssignableInstances assuming the DEFAULT quota type for all tasks.
* @param assignableInstances AssignableInstances
* @param tasks TaskConfigs of the same quota type
* @return taskID -> TaskAssignmentResult mappings
*/
public Map<String, TaskAssignResult> assignTasks(Iterable<AssignableInstance> assignableInstances,
Iterable<TaskConfig> tasks) {
return assignTasks(assignableInstances, tasks, AssignableInstance.DEFAULT_QUOTA_TYPE);
}
/**
* This is a simple task assigning algorithm that uses the following assumptions to achieve
* efficiency in assigning tasks:
* 1. All tasks have same quota type
* 2. All tasks only need 1 thread for assignment, no other things to consider
* The algorithm ensures the spread-out of tasks with same quota type or tasks from same job, with
* best effort.
* NOTE: once we have more things to consider during scheduling, we will need to come up with
* a more generic task assignment algorithm.
* @param assignableInstances AssignableInstances
* @param tasks TaskConfigs of the same quota type
* @param quotaType quota type of the tasks
* @return taskID -> TaskAssignmentResult mappings
*/
@Override
public Map<String, TaskAssignResult> assignTasks(Iterable<AssignableInstance> assignableInstances,
Iterable<TaskConfig> tasks, String quotaType) {
throw new UnsupportedOperationException();
}
@Override
public Map<String, TaskAssignResult> assignTasks(
AssignableInstanceManager assignableInstanceManager, Collection<String> instances,
Iterable<TaskConfig> tasks, String quotaType) {
Set<AssignableInstance> assignableInstances = new HashSet<>();
// Only add the AssignableInstances that are also in instances
for (String instance : instances) {
assignableInstances.add(assignableInstanceManager.getAssignableInstance(instance));
}
if (tasks == null || !tasks.iterator().hasNext()) {
return Collections.emptyMap();
}
if (assignableInstances.isEmpty()) {
return buildNoInstanceAssignment(tasks, quotaType);
}
if (quotaType == null || quotaType.equals("") || quotaType.equals("null")) {
// Sometimes null is stored as a String literal
logger.warn("Quota type is null. Assigning it as DEFAULT type!");
quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE;
}
logger.info("Assigning tasks with quota type {}", quotaType);
// Build a sched queue
PriorityQueue<AssignableInstance> queue = buildSchedQueue(quotaType, assignableInstances);
// Assign
Map<String, TaskAssignResult> assignResults = new HashMap<>();
for (TaskConfig task : tasks) {
// Dedup
if (assignResults.containsKey(task.getId())) {
logger.warn("Duplicated task assignment {}", task);
continue;
}
// Try to assign the task to least used instance
AssignableInstance instance = queue.poll();
TaskAssignResult result = instance.tryAssign(task, quotaType);
assignResults.put(task.getId(), result);
if (result.isSuccessful()) {
// If the task is successfully accepted by the instance, assign it to the instance
assignableInstanceManager.assign(instance.getInstanceName(), result);
}
// requeue the instance to rank again
queue.offer(instance);
}
logger.info("Finished assigning tasks with quota type {}", quotaType);
return assignResults;
}
private PriorityQueue<AssignableInstance> buildSchedQueue(String quotaType,
Iterable<AssignableInstance> instances) {
AssignableInstanceComparator comparator = new AssignableInstanceComparator(quotaType);
PriorityQueue<AssignableInstance> queue =
new PriorityQueue<>(SCHED_QUEUE_INIT_CAPACITY, comparator);
for (AssignableInstance assignableInstance : instances) {
queue.offer(assignableInstance);
}
return queue;
}
private Map<String, TaskAssignResult> buildNoInstanceAssignment(Iterable<TaskConfig> tasks,
String quotaType) {
Map<String, TaskAssignResult> result = new HashMap<>();
for (TaskConfig taskConfig : tasks) {
result.put(taskConfig.getId(), new TaskAssignResult(taskConfig, quotaType, null, false, 0,
TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA, "No assignable instance to assign"));
}
return result;
}
private static class AssignableInstanceComparator implements Comparator<AssignableInstance> {
/**
* Resource type this comparator needs to compare
*/
private final String RESOURCE_TYPE = LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name();
/**
* Resource quota type this comparator needs to compare
*/
private final String _quotaType;
public AssignableInstanceComparator(String quotaType) {
_quotaType = quotaType;
}
/**
* Using this comparator, AssignableInstance will be sorted based on availability of
* quota given job type in the priority queue. Top of the queue will be the one with
* highest priority
* @return a negative integer, zero, or a positive integer as the
* first argument is less than, equal to, or greater than the
* second
*/
@Override
public int compare(AssignableInstance o1, AssignableInstance o2) {
Integer o1RemainingCapacity = getRemainingUsage(o1.getTotalCapacity(), o1.getUsedCapacity());
Integer o2RemainingCapacity = getRemainingUsage(o2.getTotalCapacity(), o2.getUsedCapacity());
return o2RemainingCapacity - o1RemainingCapacity;
}
private Integer getRemainingUsage(Map<String, Map<String, Integer>> capacity,
Map<String, Map<String, Integer>> used) {
if (capacity.containsKey(RESOURCE_TYPE)) {
String quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE;
if (capacity.get(RESOURCE_TYPE).containsKey(_quotaType)) {
// If the quotaType is not supported, sort as DEFAULT because it will be assigned as
// DEFAULT
quotaType = _quotaType;
}
return capacity.get(RESOURCE_TYPE).get(quotaType) - used.get(RESOURCE_TYPE).get(quotaType);
}
return 0;
}
}
public void init(AssignableInstanceManager assignableInstanceManager) {
_assignableInstanceManager = assignableInstanceManager;
}
}