blob: ac87af282312788cec6111a4450e02a9324d768e [file] [log] [blame]
package org.apache.helix.task;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.task.assigner.AssignableInstance;
public abstract class TaskAssignmentCalculator {
/**
* Get all the partitions/tasks that belong to this job.
* @param jobCfg the task configuration
* @param jobCtx the task context
* @param workflowCfg the workflow configuration
* @param workflowCtx the workflow context
* @param idealStateMap the map of resource name map to ideal state
* @return set of partition numbers
*/
public abstract Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
Map<String, IdealState> idealStateMap);
/**
* Compute an assignment of tasks to instances
* @param currStateOutput the current state of the instances
* @param prevAssignment the previous task partition assignment
* @param instances the instances
* @param jobCfg the task configuration
* @param jobContext the task context
* @param workflowCfg the workflow configuration
* @param workflowCtx the workflow context
* @param partitionSet the partitions to assign
* @param idealStateMap the map of resource name map to ideal state
* @return map of instances to set of partition numbers
*/
@Deprecated
public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
Map<String, IdealState> idealStateMap);
/**
* Compute an assignment of tasks to instances
* @param currStateOutput the current state of the instances
* @param instances the instances
* @param jobCfg the task configuration
* @param jobContext the task context
* @param workflowCfg the workflow configuration
* @param workflowCtx the workflow context
* @param partitionSet the partitions to assign
* @param idealStateMap the map of resource name map to ideal state
* @return map of instances to set of partition numbers
*/
public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
CurrentStateOutput currStateOutput, Collection<String> instances, JobConfig jobCfg,
JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
Set<Integer> partitionSet, Map<String, IdealState> idealStateMap);
/**
* Returns the correct type for this job. Note that if the parent workflow has a type, then all of
* its jobs will inherit the type from the workflow.
* @param workflowConfig
* @param jobConfig
* @return
*/
public static String getQuotaType(WorkflowConfig workflowConfig, JobConfig jobConfig) {
String workflowType = workflowConfig.getWorkflowType();
if (workflowType == null || workflowType.equals("")) {
// Workflow type is null, so we go by the job type
String jobType = jobConfig.getJobType();
if (jobType == null || jobType.equals("")) {
// Job type is null, so we use DEFAULT
return AssignableInstance.DEFAULT_QUOTA_TYPE;
}
return jobType;
}
return workflowType;
}
/**
* Returns the tasks that should be dropped either because the task has been removed from config
* in generic jobs or target resources IS does not have the target partition anymore
* @param jobConfig
* @param jobContext
*/
public Set<Integer> getRemovedPartitions(JobConfig jobConfig, JobContext jobContext,
Set<Integer> allPartitions) {
// Get all partitions existed in the context
Set<Integer> deletedPartitions = new HashSet<>();
// Check whether the tasks have been deleted from jobConfig
for (Integer partition : jobContext.getPartitionSet()) {
String partitionID = jobContext.getTaskIdForPartition(partition);
if (!jobConfig.getTaskConfigMap().containsKey(partitionID)) {
deletedPartitions.add(partition);
}
}
return deletedPartitions;
}
}