blob: 6727d9d5039077f0aedc3632573bb362eeb05509 [file] [log] [blame]
package org.apache.helix.task;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.task.assigner.AssignableInstance;
import org.apache.helix.task.assigner.TaskAssignResult;
import org.apache.helix.task.assigner.TaskAssigner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* ThreadCountBasedTaskAssignmentCalculator is an implementation of TaskAssignmentCalculator. It
* serves as a wrapper around ThreadCountBasedTaskAssigner so that it could be used in the existing
* Task Framework Controller pipeline (WorkflowRebalancer and JobRebalancer).
*/
public class ThreadCountBasedTaskAssignmentCalculator extends TaskAssignmentCalculator {
private static final Logger LOG =
LoggerFactory.getLogger(ThreadCountBasedTaskAssignmentCalculator.class);
private TaskAssigner _taskAssigner;
private AssignableInstanceManager _assignableInstanceManager;
/**
* Constructor for ThreadCountBasedTaskAssignmentCalculator. Requires an instance of
* ThreadCountBasedTaskAssigner and the up-to-date AssignableInstanceManager.
* @param taskAssigner
* @param assignableInstanceManager
*/
public ThreadCountBasedTaskAssignmentCalculator(TaskAssigner taskAssigner,
AssignableInstanceManager assignableInstanceManager) {
_taskAssigner = taskAssigner;
_assignableInstanceManager = assignableInstanceManager;
}
@Override
public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
Map<String, IdealState> idealStateMap) {
Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap();
Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap();
for (TaskConfig taskCfg : taskMap.values()) {
String taskId = taskCfg.getId();
int nextPartition = jobCtx.getPartitionSet().size();
if (!taskIdMap.containsKey(taskId)) {
jobCtx.setTaskIdForPartition(nextPartition, taskId);
}
}
return jobCtx.getPartitionSet();
}
@Override
public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) {
if (jobCfg.getTargetResource() != null) {
LOG.error(
"Target resource is not null, should call FixedTaskAssignmentCalculator, target resource : {}",
jobCfg.getTargetResource());
return new HashMap<>();
}
// Convert the filtered partitionSet (partition numbers) to TaskConfigs
Iterable<TaskConfig> taskConfigs = getFilteredTaskConfigs(partitionSet, jobCfg, jobContext);
// Get the quota type to assign tasks to
String quotaType = getQuotaType(workflowCfg, jobCfg);
// Assign tasks to AssignableInstances
Map<String, TaskAssignResult> taskAssignResultMap =
_taskAssigner.assignTasks(_assignableInstanceManager, instances, taskConfigs, quotaType);
// TODO: Do this with Quota Manager is ready
// Cache TaskAssignResultMap to prevent double-assign
// This will be used in AbstractTaskDispatcher to release tasks that aren't actually
// scheduled/throttled
// _assignableInstanceManager.getTaskAssignResultMap().putAll(taskAssignResultMap);
// Get TaskId->PartitionNumber mappings for conversion
Map<String, Integer> taskIdPartitionMap = jobContext.getTaskIdPartitionMap();
// Instantiate the result map that maps instance to set of task (partition) mappings
Map<String, SortedSet<Integer>> taskAssignment = new HashMap<>();
// Loop through all TaskAssignResults and convert the result to the format compliant to
// TaskAssignmentCalculator's API
for (Map.Entry<String, TaskAssignResult> assignResultEntry : taskAssignResultMap.entrySet()) {
TaskAssignResult taskAssignResult = assignResultEntry.getValue();
if (taskAssignResult.isSuccessful()) {
String instanceName = taskAssignResult.getInstanceName();
String taskId = taskAssignResult.getTaskConfig().getId();
// Since return value contains SortedSet<Integer> which is a set of partition numbers, we
// must convert the taskID (given in TaskAssignResult) to its corresponding partition
// number using taskIdPartitionMap found in JobContext
if (!taskIdPartitionMap.containsKey(taskId)) {
LOG.warn(
"Task is not found in taskIdPartitionMap. Skipping this task! JobID: {}, TaskID: {}",
jobCfg.getJobId(), taskId);
continue;
}
int partitionNumberForTask = taskIdPartitionMap.get(taskId);
if (!taskAssignment.containsKey(instanceName)) {
taskAssignment.put(instanceName, new TreeSet<Integer>());
}
taskAssignment.get(instanceName).add(partitionNumberForTask);
}
}
return taskAssignment;
}
/**
* Returns TaskConfigs whose partition numbers (ids) are present in filteredPartitionNumbers. This
* means that these tasks should have the state of INIT, RUNNING, or null. This function basically
* converts partition numbers to corresponding TaskConfigs.
* @param jobContext
* @param filteredPartitionNumbers
* @return
*/
private Iterable<TaskConfig> getFilteredTaskConfigs(Set<Integer> filteredPartitionNumbers,
JobConfig jobConfig, JobContext jobContext) {
Set<TaskConfig> filteredTaskConfigs = new HashSet<>();
for (int partitionNumber : filteredPartitionNumbers) {
String taskId = jobContext.getTaskIdForPartition(partitionNumber);
TaskConfig taskConfig = jobConfig.getTaskConfig(taskId);
filteredTaskConfigs.add(taskConfig);
}
return filteredTaskConfigs;
}
}