blob: b10eb5e09cce08f4a0c4d7cc7ece27f0f8dc903c [file] [log] [blame]
package org.apache.helix.task;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import com.google.common.collect.ImmutableMap;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.task.assigner.ThreadCountBasedTaskAssigner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JobDispatcher extends AbstractTaskDispatcher {
private static final Logger LOG = LoggerFactory.getLogger(JobDispatcher.class);
// Intermediate states (meaning they are not terminal states) for workflows and jobs
private static final Set<TaskState> INTERMEDIATE_STATES = new HashSet<>(Arrays
.asList(TaskState.IN_PROGRESS, TaskState.NOT_STARTED, TaskState.STOPPING, TaskState.STOPPED));
private WorkflowControllerDataProvider _dataProvider;
public void updateCache(WorkflowControllerDataProvider cache) {
_dataProvider = cache;
}
public ResourceAssignment processJobStatusUpdateAndAssignment(String jobName,
CurrentStateOutput currStateOutput, WorkflowContext workflowCtx) {
// Fetch job configuration
final JobConfig jobCfg = _dataProvider.getJobConfig(jobName);
if (jobCfg == null) {
LOG.error("Job configuration is NULL for " + jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
String workflowResource = jobCfg.getWorkflow();
// Fetch workflow configuration and context
final WorkflowConfig workflowCfg = _dataProvider.getWorkflowConfig(workflowResource);
if (workflowCfg == null) {
LOG.error("Workflow configuration is NULL for " + jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
if (workflowCtx == null) {
LOG.error("Workflow context is NULL for " + jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
TargetState targetState = workflowCfg.getTargetState();
if (targetState != TargetState.START && targetState != TargetState.STOP) {
LOG.info("Target state is " + targetState.name() + " for workflow " + workflowResource
+ ".Stop scheduling job " + jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
// Stop current run of the job if workflow or job is already in final state (failed or
// completed)
TaskState workflowState = workflowCtx.getWorkflowState();
TaskState jobState = workflowCtx.getJobState(jobName);
// The job is already in a final state (completed/failed).
if (workflowState == TaskState.FAILED || workflowState == TaskState.COMPLETED
|| jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) {
LOG.info(String.format(
"Workflow %s or job %s is already failed or completed, workflow state (%s), job state (%s), clean up job IS.",
workflowResource, jobName, workflowState, jobState));
finishJobInRuntimeJobDag(_dataProvider.getTaskDataCache(), workflowResource, jobName);
TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
_rebalanceScheduler.removeScheduledRebalance(jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
if (!isWorkflowReadyForSchedule(workflowCfg)) {
LOG.info("Job is not ready to be run since workflow is not ready " + jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
if (!TaskUtil.isJobStarted(jobName, workflowCtx) && !isJobReadyToSchedule(jobName, workflowCfg,
workflowCtx, TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx),
_dataProvider.getJobConfigMap(), _dataProvider,
_dataProvider.getAssignableInstanceManager())) {
LOG.info("Job is not ready to run " + jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
// Fetch any existing context information from the property store.
JobContext jobCtx = _dataProvider.getJobContext(jobName);
if (jobCtx == null) {
jobCtx = new JobContext(new ZNRecord(TaskUtil.TASK_CONTEXT_KW));
final long currentTimestamp = System.currentTimeMillis();
jobCtx.setStartTime(currentTimestamp);
jobCtx.setName(jobName);
// This job's JobContext has not been created yet. Since we are creating a new JobContext
// here, we must also create its UserContentStore
TaskUtil.createUserContent(_manager.getHelixPropertyStore(), jobName,
new ZNRecord(TaskUtil.USER_CONTENT_NODE));
workflowCtx.setJobState(jobName, TaskState.IN_PROGRESS);
// Since this job has been processed for the first time, we report SubmissionToProcessDelay
// here asynchronously
reportSubmissionToProcessDelay(_dataProvider, _clusterStatusMonitor, workflowCfg, jobCfg,
currentTimestamp);
}
if (!TaskState.TIMED_OUT.equals(workflowCtx.getJobState(jobName))) {
scheduleRebalanceForTimeout(jobCfg.getJobId(), jobCtx.getStartTime(), jobCfg.getTimeout());
}
// Will contain the list of partitions that must be explicitly dropped from the ideal state that
// is stored in zk.
Set<String> liveInstances =
jobCfg.getInstanceGroupTag() == null ? _dataProvider.getEnabledLiveInstances()
: _dataProvider.getEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
if (liveInstances.isEmpty()) {
LOG.error("No available instance found for job!");
}
TargetState jobTgtState = workflowCfg.getTargetState();
jobState = workflowCtx.getJobState(jobName);
workflowState = workflowCtx.getWorkflowState();
if (INTERMEDIATE_STATES.contains(jobState)
&& (isTimeout(jobCtx.getStartTime(), jobCfg.getTimeout())
|| TaskState.TIMED_OUT.equals(workflowState))) {
jobState = TaskState.TIMING_OUT;
workflowCtx.setJobState(jobName, TaskState.TIMING_OUT);
} else if (jobState != TaskState.TIMING_OUT && jobState != TaskState.FAILING) {
// TIMING_OUT/FAILING/ABORTING job can't be stopped, because all tasks are being aborted
// Update running status in workflow context
if (jobTgtState == TargetState.STOP) {
if (jobState != TaskState.NOT_STARTED && TaskUtil.checkJobStopped(jobCtx)) {
workflowCtx.setJobState(jobName, TaskState.STOPPED);
} else {
workflowCtx.setJobState(jobName, TaskState.STOPPING);
}
// Workflow has been stopped if all in progress jobs are stopped
if (isWorkflowStopped(workflowCtx, workflowCfg)) {
workflowCtx.setWorkflowState(TaskState.STOPPED);
} else {
workflowCtx.setWorkflowState(TaskState.STOPPING);
}
} else {
workflowCtx.setJobState(jobName, TaskState.IN_PROGRESS);
// Workflow is in progress if any task is in progress
workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
}
}
Set<Integer> partitionsToDrop = new TreeSet<>();
ResourceAssignment newAssignment =
computeResourceMapping(jobName, workflowCfg, jobCfg, jobState, jobTgtState, liveInstances,
currStateOutput, workflowCtx, jobCtx, partitionsToDrop, _dataProvider);
// Update Workflow and Job context in data cache and ZK.
_dataProvider.updateJobContext(jobName, jobCtx);
_dataProvider.updateWorkflowContext(workflowResource, workflowCtx);
LOG.debug("Job " + jobName + " new assignment "
+ Arrays.toString(newAssignment.getMappedPartitions().toArray()));
return newAssignment;
}
private ResourceAssignment computeResourceMapping(String jobResource,
WorkflowConfig workflowConfig, JobConfig jobCfg, TaskState jobState, TargetState jobTgtState,
Collection<String> liveInstances, CurrentStateOutput currStateOutput,
WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs,
WorkflowControllerDataProvider cache) {
// Used to keep track of tasks that have already been assigned to instances.
// InstanceName -> Set of task partitions assigned to that instance in this iteration
Map<String, Set<Integer>> assignedPartitions = new HashMap<>();
// Used to keep track of tasks that have failed, but whose failure is acceptable
Set<Integer> skippedPartitions = new HashSet<>();
// Keeps a mapping of (partition) -> (instance, state)
Map<Integer, PartitionAssignment> paMap = new TreeMap<>();
Set<String> excludedInstances =
getExcludedInstances(jobResource, workflowConfig, workflowCtx, cache);
// Process all the current assignments of tasks.
TaskAssignmentCalculator taskAssignmentCal = getAssignmentCalculator(jobCfg, cache);
Set<Integer> allPartitions = taskAssignmentCal.getAllTaskPartitions(jobCfg, jobCtx,
workflowConfig, workflowCtx, cache.getIdealStates());
if (allPartitions == null || allPartitions.isEmpty()) {
// Empty target partitions, mark the job as FAILED.
String failureMsg =
"Empty task partition mapping for job " + jobResource + ", marked the job as FAILED!";
LOG.info(failureMsg);
jobCtx.setInfo(failureMsg);
failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap(), cache);
markAllPartitionsError(jobCtx);
return new ResourceAssignment(jobResource);
}
// This set contains all task pIds that need to be dropped because requestedState is DROPPED
// Newer versions of Participants, upon connection reset, sets task requestedStates to DROPPED
// These dropping transitions will be prioritized above all task state transition assignments
Map<String, Set<Integer>> tasksToDrop = new HashMap<>();
Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments =
getCurrentInstanceToTaskAssignments(liveInstances, currStateOutput, jobResource, tasksToDrop);
updateInstanceToTaskAssignmentsFromContext(jobCtx, currentInstanceToTaskAssignments);
long currentTime = System.currentTimeMillis();
if (LOG.isDebugEnabled()) {
LOG.debug("All partitions: " + allPartitions + " taskAssignment: "
+ currentInstanceToTaskAssignments + " excludedInstances: " + excludedInstances);
}
// Release resource for tasks in terminal state
updatePreviousAssignedTasksStatus(currentInstanceToTaskAssignments, excludedInstances,
jobResource, currStateOutput, jobCtx, jobCfg, jobState, assignedPartitions,
partitionsToDropFromIs, paMap, jobTgtState, skippedPartitions, cache, tasksToDrop);
addGivenUpPartitions(skippedPartitions, jobCtx, allPartitions, jobCfg);
if (jobState == TaskState.IN_PROGRESS && skippedPartitions.size() > jobCfg.getFailureThreshold()
|| (jobCfg.getTargetResource() != null
&& cache.getIdealState(jobCfg.getTargetResource()) != null
&& !cache.getIdealState(jobCfg.getTargetResource()).isEnabled())) {
if (isJobFinished(jobCtx, jobResource, currStateOutput)) {
failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap(), cache);
return buildEmptyAssignment(jobResource, currStateOutput);
}
workflowCtx.setJobState(jobResource, TaskState.FAILING);
// Drop all assigned but not given-up tasks
for (int pId : jobCtx.getPartitionSet()) {
String instance = jobCtx.getAssignedParticipant(pId);
if (jobCtx.getPartitionState(pId) != null && !isTaskGivenup(jobCtx, jobCfg, pId)) {
paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.TASK_ABORTED.name()));
}
Partition partition = new Partition(pName(jobResource, pId));
Message pendingMessage =
currStateOutput.getPendingMessage(jobResource, partition, instance);
// While job is failing, if the task is pending on INIT->RUNNING, set it back to INIT,
// so that Helix will cancel the transition.
if (jobCtx.getPartitionState(pId) == TaskPartitionState.INIT && pendingMessage != null) {
paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.INIT.name()));
}
}
return toResourceAssignment(jobResource, paMap);
}
if (jobState == TaskState.FAILING && isJobFinished(jobCtx, jobResource, currStateOutput)) {
failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap(), cache);
return buildEmptyAssignment(jobResource, currStateOutput);
}
if (isJobComplete(jobCtx, allPartitions, jobCfg)) {
markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx, cache.getJobConfigMap(),
cache);
_clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED,
jobCtx.getFinishTime() - jobCtx.getStartTime());
_rebalanceScheduler.removeScheduledRebalance(jobResource);
TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
return buildEmptyAssignment(jobResource, currStateOutput);
}
// If job is being timed out and no task is running (for whatever reason), idealState can be
// deleted and all tasks
// can be dropped(note that Helix doesn't track whether the drop is success or not).
if (jobState == TaskState.TIMING_OUT && isJobFinished(jobCtx, jobResource, currStateOutput)) {
handleJobTimeout(jobCtx, workflowCtx, jobResource, jobCfg);
finishJobInRuntimeJobDag(cache.getTaskDataCache(), workflowConfig.getWorkflowId(),
jobResource);
return buildEmptyAssignment(jobResource, currStateOutput);
}
// For delayed tasks, trigger a rebalance event for the closest upcoming ready time
scheduleForNextTask(jobResource, jobCtx, currentTime);
// Make additional task assignments if needed.
if (jobState != TaskState.TIMING_OUT && jobState != TaskState.TIMED_OUT
&& jobTgtState == TargetState.START) {
handleAdditionalTaskAssignment(currentInstanceToTaskAssignments, excludedInstances,
jobResource, currStateOutput, jobCtx, jobCfg, workflowConfig, workflowCtx, cache,
assignedPartitions, paMap, skippedPartitions, taskAssignmentCal, allPartitions,
currentTime, liveInstances);
}
return toResourceAssignment(jobResource, paMap);
}
private ResourceAssignment toResourceAssignment(String jobResource,
Map<Integer, PartitionAssignment> paMap) {
// Construct a ResourceAssignment object from the map of partition assignments.
ResourceAssignment ra = new ResourceAssignment(jobResource);
for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
PartitionAssignment pa = e.getValue();
ra.addReplicaMap(new Partition(pName(jobResource, e.getKey())),
ImmutableMap.of(pa._instance, pa._state));
}
return ra;
}
private boolean isJobFinished(JobContext jobContext, String jobResource,
CurrentStateOutput currentStateOutput) {
for (int pId : jobContext.getPartitionSet()) {
TaskPartitionState state = jobContext.getPartitionState(pId);
Partition partition = new Partition(pName(jobResource, pId));
String instance = jobContext.getAssignedParticipant(pId);
Message pendingMessage =
currentStateOutput.getPendingMessage(jobResource, partition, instance);
// If state is INIT but is pending INIT->RUNNING, it's not yet safe to say the job finished
if (state == TaskPartitionState.RUNNING
|| (state == TaskPartitionState.INIT && pendingMessage != null)) {
return false;
}
}
return true;
}
/**
* Checks if the job has completed. Look at states of all tasks of the job, there're 3 kind:
* completed, given up, not given up. The job is completed if all tasks are completed or given up,
* and the number of given up tasks is within job failure threshold.
*/
private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions, JobConfig cfg) {
int numOfGivenUpTasks = 0;
// Iterate through all tasks, if any one indicates the job has not completed, return false.
for (Integer pId : allPartitions) {
TaskPartitionState state = ctx.getPartitionState(pId);
if (state != TaskPartitionState.COMPLETED) {
if (!isTaskGivenup(ctx, cfg, pId)) {
return false;
}
// If the task is given up, there's still chance the job has completed because of job
// failure threshold.
numOfGivenUpTasks++;
}
}
return numOfGivenUpTasks <= cfg.getFailureThreshold();
}
/**
* @param liveInstances
* @param currStateOutput currentStates to make sure currentStates copied over expired sessions
* are accounted for
* @param jobName job name
* @param tasksToDrop instance -> pId's, to gather all pIds that need to be dropped
* @return instance -> partitionIds from currentState, if the instance is still live
*/
protected static Map<String, SortedSet<Integer>> getCurrentInstanceToTaskAssignments(
Iterable<String> liveInstances, CurrentStateOutput currStateOutput, String jobName,
Map<String, Set<Integer>> tasksToDrop) {
Map<String, SortedSet<Integer>> result = new HashMap<>();
for (String instance : liveInstances) {
result.put(instance, new TreeSet<>());
}
// Generate currentInstanceToTaskAssignment with CurrentStateOutput as source of truth
// Add all pIds existing in CurrentStateOutput
// We need to add these pIds to result and update their states in JobContext in
// updatePreviousAssignedTasksStatus method.
Map<Partition, Map<String, String>> partitions = currStateOutput.getCurrentStateMap(jobName);
for (Map.Entry<Partition, Map<String, String>> entry : partitions.entrySet()) {
// Get all (instance -> currentState) mappings
for (Map.Entry<String, String> instanceToCurrState : entry.getValue().entrySet()) {
String instance = instanceToCurrState.getKey();
String requestedState =
currStateOutput.getRequestedState(jobName, entry.getKey(), instance);
int pId = TaskUtil.getPartitionId(entry.getKey().getPartitionName());
if (result.containsKey(instance)) {
result.get(instance).add(pId);
// Check if this task needs to be dropped. If so, we need to add to tasksToDrop no matter
// what its current state is so that it will be dropped
// This is trying to drop tasks on a reconnected instance with a new sessionId that have
// all of their requestedState == DROPPED
if (requestedState != null && requestedState.equals(TaskPartitionState.DROPPED.name())) {
if (!tasksToDrop.containsKey(instance)) {
tasksToDrop.put(instance, new HashSet<>());
}
tasksToDrop.get(instance).add(pId);
}
}
}
}
return result;
}
/**
* If partition is missing from prevInstanceToTaskAssignments it is added from context. Otherwise,
* the context won't be updated.
* @param jobCtx Job Context
* @param currentInstanceToTaskAssignments instance -> partitionIds from CurrentStateOutput
*/
protected void updateInstanceToTaskAssignmentsFromContext(JobContext jobCtx,
Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments) {
for (Integer partition : jobCtx.getPartitionSet()) {
// We must add all active task pIds back here
// The states other than Running and Init do not need to be added.
// Logic in this function is similar to getPrevInstanceToTaskAssignments method
if (jobCtx.getPartitionState(partition) == TaskPartitionState.RUNNING
|| jobCtx.getPartitionState(partition) == TaskPartitionState.INIT) {
String instance = jobCtx.getAssignedParticipant(partition);
if (instance != null) {
if (currentInstanceToTaskAssignments.containsKey(instance)
&& !currentInstanceToTaskAssignments.get(instance).contains(partition)) {
currentInstanceToTaskAssignments.get(instance).add(partition);
}
}
}
}
}
/**
* If the job is a targeted job, use fixedTaskAssignmentCalculator. Otherwise, use
* threadCountBasedTaskAssignmentCalculator. Both calculators support quota-based scheduling.
* @param jobConfig
* @param cache
* @return
*/
private TaskAssignmentCalculator getAssignmentCalculator(JobConfig jobConfig,
WorkflowControllerDataProvider cache) {
AssignableInstanceManager assignableInstanceManager = cache.getAssignableInstanceManager();
if (TaskUtil.isGenericTaskJob(jobConfig)) {
return new ThreadCountBasedTaskAssignmentCalculator(new ThreadCountBasedTaskAssigner(),
assignableInstanceManager);
}
return new FixedTargetTaskAssignmentCalculator(assignableInstanceManager);
}
}