blob: 6850220fffbb9ee90f7ce3d84ef5c6a8f32e3878 [file] [log] [blame]
package org.apache.helix.task;
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.task.assigner.TaskAssignResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* A TaskAssignmentCalculator for when a task group must be assigned according to partitions/states
* on a target
* resource. Here, tasks are co-located according to where a resource's partitions are, as well as
* (if desired) only where those partitions are in a given state.
public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculator {
private static final Logger LOG =
private AssignableInstanceManager _assignableInstanceManager;
* Default constructor. Because of quota-based scheduling support, we need
* AssignableInstanceManager. This constructor should not be used.
public FixedTargetTaskAssignmentCalculator() {
* Constructor for FixedTargetTaskAssignmentCalculator. Requires AssignableInstanceManager for
* "charging" resources per task.
* @param assignableInstanceManager
public FixedTargetTaskAssignmentCalculator(AssignableInstanceManager assignableInstanceManager) {
_assignableInstanceManager = assignableInstanceManager;
public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
Map<String, IdealState> idealStateMap) {
IdealState tgtIs = idealStateMap.get(jobCfg.getTargetResource());
return getAllTaskPartitions(tgtIs, jobCfg, jobCtx);
public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) {
return getTaskAssignment(currStateOutput, instances, jobCfg, jobContext, workflowCfg,
workflowCtx, partitionSet, idealStateMap);
public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
Map<String, IdealState> idealStateMap) {
return computeAssignmentAndChargeResource(currStateOutput, instances, workflowCfg, jobCfg,
jobContext, partitionSet, idealStateMap);
* Returns the set of all partition ids for a job.
* If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we
* use the list of all partition ids from the target resource.
* return empty set if target resource does not exist.
private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, JobConfig jobCfg,
JobContext taskCtx) {
Map<String, List<Integer>> currentTargets = taskCtx.getPartitionsByTarget();
SortedSet<String> targetPartitions = Sets.newTreeSet();
if (jobCfg.getTargetPartitions() != null) {
} else {
if (tgtResourceIs != null) {
} else {
LOG.warn("Missing target resource for the scheduled job {}!",
taskCtx != null ? taskCtx.getName() : "null");
Set<Integer> taskPartitions = Sets.newTreeSet();
for (String targetPartition : targetPartitions) {
.addAll(getPartitionsForTargetPartition(targetPartition, currentTargets, taskCtx));
return taskPartitions;
public Set<Integer> getRemovedPartitions(JobConfig jobConfig, JobContext jobContext, Set<Integer> allPartitions) {
return jobContext.getPartitionSet().stream().filter(partition -> !allPartitions.contains(partition)).collect(
private static List<Integer> getPartitionsForTargetPartition(String targetPartition,
Map<String, List<Integer>> currentTargets, JobContext jobCtx) {
if (!currentTargets.containsKey(targetPartition)) {
int nextId = jobCtx.getPartitionSet().size();
jobCtx.setPartitionTarget(nextId, targetPartition);
return Lists.newArrayList(nextId);
} else {
return currentTargets.get(targetPartition);
* NOTE: this method has been deprecated due to the addition of quota-based task scheduling.
* Get partition assignments for the target resource, but only for the partitions of interest.
* @param currStateOutput The current state of the instances in the cluster.
* @param instances The instances.
* @param tgtIs The ideal state of the target resource.
* @param tgtStates Only partitions in this set of states will be considered. If null, partitions
* do not need to
* be in any specific state to be considered.
* @param includeSet The set of partitions to consider.
* @return A map of instance vs set of partition ids assigned to that instance.
private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
CurrentStateOutput currStateOutput, Iterable<String> instances, IdealState tgtIs,
Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx) {
Map<String, SortedSet<Integer>> result = new HashMap<>();
for (String instance : instances) {
result.put(instance, new TreeSet<Integer>());
Map<String, List<Integer>> partitionsByTarget = jobCtx.getPartitionsByTarget();
for (String pName : tgtIs.getPartitionSet()) {
List<Integer> partitions = partitionsByTarget.get(pName);
if (partitions == null || partitions.size() < 1) {
int pId = partitions.get(0);
if (includeSet.contains(pId)) {
for (String instance : instances) {
Message pendingMessage = currStateOutput.getPendingMessage(tgtIs.getResourceName(),
new Partition(pName), instance);
if (pendingMessage != null) {
String s = currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName),
if (s != null && (tgtStates == null || tgtStates.contains(s))) {
return result;
* Calculate the assignment for given tasks. This assignment also charges resources for each task
* and takes resource/quota availability into account while assigning.
* @param currStateOutput
* @param liveInstances
* @param jobCfg
* @param jobContext
* @param taskPartitionSet
* @param idealStateMap
* @return instance -> set of task partition numbers
private Map<String, SortedSet<Integer>> computeAssignmentAndChargeResource(
CurrentStateOutput currStateOutput, Collection<String> liveInstances,
WorkflowConfig workflowCfg, JobConfig jobCfg, JobContext jobContext,
Set<Integer> taskPartitionSet, Map<String, IdealState> idealStateMap) {
// Note: targeted jobs also take up capacity in quota-based scheduling
// "Charge" resources for the tasks
String quotaType = getQuotaType(workflowCfg, jobCfg);
// IdealState of the target resource
IdealState targetIdealState = idealStateMap.get(jobCfg.getTargetResource());
if (targetIdealState == null) {
LOG.warn("Missing target resource for the scheduled job {}!",
jobContext != null ? jobContext.getName() : "null");
return Collections.emptyMap();
// The "states" you want to assign to. Assign to the partitions of the target resource if these
// partitions are in one of these states
Set<String> targetStates = jobCfg.getTargetPartitionStates();
Map<String, SortedSet<Integer>> result = new HashMap<>();
for (String instance : liveInstances) {
result.put(instance, new TreeSet<Integer>());
// <Target resource partition name -> list of task partition numbers> mapping
Map<String, List<Integer>> partitionsByTarget = jobContext.getPartitionsByTarget();
for (String targetResourcePartitionName : targetIdealState.getPartitionSet()) {
// Get all task partition numbers to be assigned to this targetResource partition
List<Integer> taskPartitions = partitionsByTarget.get(targetResourcePartitionName);
if (taskPartitions == null || taskPartitions.size() < 1) {
continue; // No tasks to assign, skip
// Get one task to be assigned to this targetResource partition
int targetPartitionId = taskPartitions.get(0);
// First, see if that task needs to be assigned at this time
if (taskPartitionSet.contains(targetPartitionId)) {
for (String instance : liveInstances) {
// See if there is a pending message on this instance on this partition for the target
// resource
// If there is, we should wait until the pending message gets processed, so skip
// assignment this time around
Message pendingMessage =
new Partition(targetResourcePartitionName), instance);
if (pendingMessage != null) {
// See if there a partition exists on this instance
String currentState = currStateOutput.getCurrentState(targetIdealState.getResourceName(),
new Partition(targetResourcePartitionName), instance);
if (currentState != null
&& (targetStates == null || targetStates.contains(currentState))) {
// Prepare pName and taskConfig for assignment
String pName = String.format("%s_%s", jobCfg.getJobId(), targetPartitionId);
if (!jobCfg.getTaskConfigMap().containsKey(pName)) {
new TaskConfig(null, null, pName, targetResourcePartitionName));
TaskConfig taskConfig = jobCfg.getTaskConfigMap().get(pName);
// On LiveInstance change, RUNNING or other non-terminal tasks might get re-assigned. If
// the new assignment differs from prevAssignment, release. If the assigned instances
// from old and new assignments are the same, then do nothing and let it keep running
// The following checks if two assignments (old and new) differ
// First, check if this taskPartition has been ever assigned before by checking
// jobContext's AssignedParticipant field
String prevAssignedInstance = jobContext.getAssignedParticipant(targetPartitionId);
TaskPartitionState taskState = jobContext.getPartitionState(targetPartitionId);
if (prevAssignedInstance != null && taskState != null
&& (taskState.equals(TaskPartitionState.INIT)
|| taskState.equals(TaskPartitionState.RUNNING))) {
// If the task is in active state and old and new assignments are different, we need
// to release from prevInstance, and this task will be assigned to a different
// instance
if (!prevAssignedInstance.equals(instance)) {
if (_assignableInstanceManager.getAssignableInstanceNames()
.contains(prevAssignedInstance)) {
_assignableInstanceManager.release(prevAssignedInstance, taskConfig, quotaType);
} else {
// This instance must be no longer live
"Task {} was reassigned from old instance: {} to new instance: {}. However, old instance: {} is not found in AssignableInstanceMap. The old instance is possibly no longer a LiveInstance. This task will not be released.",
pName, prevAssignedInstance, instance, prevAssignedInstance);
} else {
// Old and new assignments are the same, so just skip assignment for this
// taskPartition so that it can just keep running
// Actual assignment logic: try to charge resources first and assign if successful
if (_assignableInstanceManager.getAssignableInstanceNames().contains(instance)) {
// Try to assign first
TaskAssignResult taskAssignResult =
_assignableInstanceManager.tryAssign(instance, taskConfig, quotaType);
if (taskAssignResult.isSuccessful()) {
// There exists a partition, the states match up, and tryAssign successful. Assign!
_assignableInstanceManager.assign(instance, taskAssignResult);
// To prevent double assign of the tasks on other replicas of the targetResource
// partition
} else if ((!taskAssignResult.isSuccessful() && taskAssignResult
.getFailureReason() == TaskAssignResult.FailureReason.TASK_ALREADY_ASSIGNED)) {
// In case of double assign, we can still include it in the assignment because
// RUNNING->RUNNING message will just be ignored by the participant
// AssignableInstance should already have it assigned, so do not double-charge
} else {
"Unable to assign the task to this AssignableInstance. Skipping this instance. Task: {}, Instance: {}, TaskAssignResult: {}",
pName, instance, taskAssignResult);
} else {
"AssignableInstance does not exist for this LiveInstance: {}. This should never happen! Will not assign to this instance.",
return result;