blob: 1ed9ab01b2b8b7093da688325a8b7849a9bd0053 [file] [log] [blame]
package org.apache.helix.task;
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.common.caches.TaskDataCache;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.builder.CustomModeISBuilder;
import org.apache.helix.model.builder.IdealStateBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WorkflowDispatcher extends AbstractTaskDispatcher {
private static final Logger LOG = LoggerFactory.getLogger(WorkflowDispatcher.class);
private static final Set<TaskState> finalStates = new HashSet<>(
Arrays.asList(TaskState.COMPLETED, TaskState.FAILED, TaskState.ABORTED, TaskState.TIMED_OUT));
private WorkflowControllerDataProvider _clusterDataCache;
private JobDispatcher _jobDispatcher;
public void updateCache(WorkflowControllerDataProvider cache) {
_clusterDataCache = cache;
if (_jobDispatcher == null) {
_jobDispatcher = new JobDispatcher();
// Split it into status update and assign. But there are couple of data need
// to pass around.
public void updateWorkflowStatus(String workflow, WorkflowConfig workflowCfg,
WorkflowContext workflowCtx, CurrentStateOutput currentStateOutput,
BestPossibleStateOutput bestPossibleOutput) {
// Fetch workflow configuration and context
if (workflowCfg == null) {
LOG.warn("Workflow configuration is NULL for " + workflow);
// Step 1: Check for deletion - if so, we don't need to go through further steps
// Clean up if workflow marked for deletion
TargetState targetState = workflowCfg.getTargetState();
if (targetState == TargetState.DELETE) {"Workflow is marked as deleted " + workflow + " cleaning up the workflow context.");
// Step 2: handle timeout, which should have higher priority than STOP
// Only generic workflow get timeouted and schedule rebalance for timeout. Will skip the set if
// the workflow already got timeouted. Job Queue will ignore the setup.
if (!workflowCfg.isJobQueue() && !finalStates.contains(workflowCtx.getWorkflowState())) {
// If timeout point has already been passed, it will not be scheduled
scheduleRebalanceForTimeout(workflow, workflowCtx.getStartTime(), workflowCfg.getTimeout());
if (!TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState())
&& isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) {
_clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
// We should not return after setting timeout, as in case the workflow is stopped already
// marking it timeout will not trigger rebalance pipeline as we are not listening on
// PropertyStore change, nor will we schedule rebalance for timeout as at this point,
// workflow is already timed-out. We should let the code proceed and wait for schedule
// future cleanup work
long currentTime = System.currentTimeMillis();
// Step 3: Check and process finished workflow context (confusing,
// but its inside isWorkflowFinished())
// Check if workflow has been finished and mark it if it is. Also update cluster status
// monitor if provided
// Note that COMPLETE and FAILED will be marked in markJobComplete / markJobFailed
// This is to handle TIMED_OUT only
if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED && isWorkflowFinished(workflowCtx,
workflowCfg, _clusterDataCache.getJobConfigMap(), _clusterDataCache)) {
updateWorkflowMonitor(workflowCtx, workflowCfg);
_clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
// Step 4: Handle finished workflows
if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) {"Workflow " + workflow + " is finished.");
long expiryTime = workflowCfg.getExpiry();
// Check if this workflow has been finished past its expiry.
if (workflowCtx.getFinishTime() + expiryTime <= currentTime) {"Workflow " + workflow + " passed expiry time, cleaning up the workflow context.");
} else {
// schedule future cleanup work
long cleanupTime = workflowCtx.getFinishTime() + expiryTime;
_rebalanceScheduler.scheduleRebalance(_manager, workflow, cleanupTime);
if (!workflowCfg.isTerminable() || workflowCfg.isJobQueue()) {
Set<String> jobWithFinalStates = new HashSet<>(workflowCtx.getJobStates().keySet());
if (jobWithFinalStates.size() > 0) {
// Update jobs already inflight
RuntimeJobDag runtimeJobDag = _clusterDataCache.getTaskDataCache().getRuntimeJobDag(workflow);
if (runtimeJobDag != null) {
for (String inflightJob : runtimeJobDag.getInflightJobList()) {
if (System.currentTimeMillis() >= workflowCtx.getJobStartTime(inflightJob)) {
processJob(inflightJob, currentStateOutput, bestPossibleOutput, workflowCtx);
} else {
"Failed to find runtime job DAG for workflow %s, existing runtime jobs may not be processed correctly for it",
// Step 5: handle workflow that should STOP
// For workflows that have already reached final states, STOP should not take into effect.
if (!finalStates.contains(workflowCtx.getWorkflowState())
&& TargetState.STOP.equals(targetState)) {"Workflow " + workflow + " is marked as stopped. Workflow state is " + workflowCtx.getWorkflowState());
if (isWorkflowStopped(workflowCtx, workflowCfg)) {
_clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
_clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
public void assignWorkflow(String workflow, WorkflowConfig workflowCfg,
WorkflowContext workflowCtx, CurrentStateOutput currentStateOutput,
BestPossibleStateOutput bestPossibleOutput) {
// Fetch workflow configuration and context
if (workflowCfg == null) {
// Already logged in status update.
if (!isWorkflowReadyForSchedule(workflowCfg)) {"Workflow " + workflow + " is not ready to schedule");
// set the timer to trigger future schedule
_rebalanceScheduler.scheduleRebalance(_manager, workflow,
// Check for readiness, and stop processing if it's not ready
boolean isReady = scheduleWorkflowIfReady(workflow, workflowCfg, workflowCtx,
if (isReady) {
// Schedule jobs from this workflow.
scheduleJobs(workflow, workflowCfg, workflowCtx, _clusterDataCache.getJobConfigMap(),
_clusterDataCache, currentStateOutput, bestPossibleOutput);
} else {
LOG.debug("Workflow " + workflow + " is not ready to be scheduled.");
_clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
public WorkflowContext getOrInitializeWorkflowContext(String workflowName, TaskDataCache cache) {
WorkflowContext workflowCtx = cache.getWorkflowContext(workflowName);
if (workflowCtx == null) {
if (cache.getWorkflowConfig(workflowName) != null) {
workflowCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
LOG.debug("Workflow context is created for " + workflowName);
} else {
// If config is null, do not initialize context
LOG.error("Workflow context is not created for {}. Workflow config is missing!",
return workflowCtx;
* Figure out whether the jobs in the workflow should be run,
* and if it's ready, then just schedule it
private void scheduleJobs(String workflow, WorkflowConfig workflowCfg,
WorkflowContext workflowCtx, Map<String, JobConfig> jobConfigMap,
WorkflowControllerDataProvider clusterDataCache, CurrentStateOutput currentStateOutput,
BestPossibleStateOutput bestPossibleOutput) {
ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
if (scheduleConfig != null && scheduleConfig.isRecurring()) {
LOG.debug("Jobs from recurring workflow are not schedule-able");
int inCompleteAllJobCount = TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx);
int scheduledJobs = 0;
long timeToSchedule = Long.MAX_VALUE;
JobDag jobDag = clusterDataCache.getTaskDataCache().getRuntimeJobDag(workflow);
if (jobDag == null) {
jobDag = workflowCfg.getJobDag();
String nextJob = jobDag.getNextJob();
// Assign new jobs
while (nextJob != null) {
String job = nextJob;
TaskState jobState = workflowCtx.getJobState(job);
if (jobState != null && !jobState.equals(TaskState.NOT_STARTED)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Job " + job + " is already started or completed.");
processJob(job, currentStateOutput, bestPossibleOutput, workflowCtx);
nextJob = jobDag.getNextJob();
if (workflowCfg.isJobQueue() && scheduledJobs >= workflowCfg.getParallelJobs()) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Workflow %s already have enough job in progress, "
+ "scheduledJobs(s)=%d, stop scheduling more jobs", workflow, scheduledJobs));
// TODO: Part of isJobReadyToSchedule() is already done by RuntimeJobDag. Because there is
// some duplicate logic, consider refactoring. The check here and the ready-list in
// RuntimeJobDag may cause conflicts.
// check ancestor job status
if (isJobReadyToSchedule(job, workflowCfg, workflowCtx, inCompleteAllJobCount, jobConfigMap,
clusterDataCache, clusterDataCache.getAssignableInstanceManager())) {
JobConfig jobConfig = jobConfigMap.get(job);
long calculatedStartTime = computeStartTimeForJob(workflowCtx, job, jobConfig);
// Time is not ready. Set a trigger and update the start time.
// Check if the job is ready to be executed.
if (System.currentTimeMillis() >= workflowCtx.getJobStartTime(job)) {
scheduleSingleJob(job, jobConfig);
workflowCtx.setJobState(job, TaskState.NOT_STARTED);
processJob(job, currentStateOutput, bestPossibleOutput, workflowCtx);
} else {
timeToSchedule = Math.min(timeToSchedule, calculatedStartTime);
nextJob = jobDag.getNextJob();
long currentScheduledTime =
_rebalanceScheduler.getRebalanceTime(workflow) == -1 ? Long.MAX_VALUE
: _rebalanceScheduler.getRebalanceTime(workflow);
if (timeToSchedule < currentScheduledTime) {
_rebalanceScheduler.scheduleRebalance(_manager, workflow, timeToSchedule);
private void processJob(String job, CurrentStateOutput currentStateOutput,
BestPossibleStateOutput bestPossibleOutput, WorkflowContext workflowCtx) {
try {
ResourceAssignment resourceAssignment =
_jobDispatcher.processJobStatusUpdateAndAssignment(job, currentStateOutput, workflowCtx);
updateBestPossibleStateOutput(job, resourceAssignment, bestPossibleOutput);
} catch (Exception e) {
LogUtil.logWarn(LOG, _clusterDataCache.getClusterEventId(),
String.format("Failed to compute job assignment for job %s", job));
* Posts new job to cluster
private void scheduleSingleJob(String jobResource, JobConfig jobConfig) {
HelixAdmin admin = _manager.getClusterManagmentTool();
IdealState jobIS = admin.getResourceIdealState(_manager.getClusterName(), jobResource);
if (jobIS != null) {"Job " + jobResource + " idealstate already exists!");
// Set up job resource based on partitions from target resource
// Create the UserContentStore for the job first
TaskUtil.createUserContent(_manager.getHelixPropertyStore(), jobResource,
new ZNRecord(TaskUtil.USER_CONTENT_NODE));
int numPartitions = jobConfig.getTaskConfigMap().size();
if (numPartitions == 0) {
IdealState targetIs =
admin.getResourceIdealState(_manager.getClusterName(), jobConfig.getTargetResource());
if (targetIs == null) {
LOG.warn("Target resource does not exist for job " + jobResource);
// do not need to fail here, the job will be marked as failure immediately when job starts
// running.
} else {
numPartitions = targetIs.getPartitionSet().size();
admin.addResource(_manager.getClusterName(), jobResource, numPartitions,
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
// Set the job configuration
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
HelixProperty resourceConfig = new HelixProperty(jobResource);
Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
if (taskConfigMap != null) {
for (TaskConfig taskConfig : taskConfigMap.values()) {
resourceConfig.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap());
accessor.setProperty(keyBuilder.resourceConfig(jobResource), resourceConfig);
// Push out new ideal state based on number of target partitions
IdealStateBuilder builder = new CustomModeISBuilder(jobResource);
if (jobConfig.getInstanceGroupTag() != null) {
if (jobConfig.isDisableExternalView()) {
jobIS =;
for (int i = 0; i < numPartitions; i++) {
jobIS.getRecord().setListField(jobResource + "_" + i, new ArrayList<>());
jobIS.getRecord().setMapField(jobResource + "_" + i, new HashMap<>());
admin.setResourceIdealState(_manager.getClusterName(), jobResource, jobIS);
* Check if a workflow is ready to schedule, and schedule a rebalance if it is not
* @param workflow the Helix resource associated with the workflow
* @param workflowCfg the workflow to check
* @param workflowCtx the current workflow context
* @return true if the workflow is ready for schedule, false if not ready
private boolean scheduleWorkflowIfReady(String workflow, WorkflowConfig workflowCfg,
WorkflowContext workflowCtx, TaskDataCache cache) {
// non-scheduled workflow is ready to run immediately.
if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) {
return true;
// Figure out when this should be run, and if it's ready, then just run it
ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
Date startTime = scheduleConfig.getStartTime();
long currentTime = new Date().getTime();
long delayFromStart = startTime.getTime() - currentTime;
if (delayFromStart <= 0) {
// Recurring workflows are just templates that spawn new workflows
if (scheduleConfig.isRecurring()) {
// Skip scheduling this workflow if it's not in a start state
if (!workflowCfg.getTargetState().equals(TargetState.START)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip scheduling since the workflow has not been started " + workflow);
return false;
// Skip scheduling this workflow again if the previous run (if any) is still active
String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow();
if (lastScheduled != null) {
WorkflowContext lastWorkflowCtx = cache.getWorkflowContext(lastScheduled);
if (lastWorkflowCtx != null
&& lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {"Skip scheduling since last schedule has not completed yet " + lastScheduled);
return false;
// Figure out how many jumps are needed, thus the time to schedule the next workflow
// The negative of the delay is the amount of time past the start time
long period =
long offsetMultiplier = (-delayFromStart) / period;
long timeToSchedule = period * offsetMultiplier + startTime.getTime();
// Now clone the workflow if this clone has not yet been created
DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss");
String newWorkflowName = workflow + "_" + df.format(new Date(timeToSchedule));
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to start workflow " + newWorkflowName);
if (!newWorkflowName.equals(lastScheduled)) {
Workflow clonedWf =
cloneWorkflow(_manager, workflow, newWorkflowName, new Date(timeToSchedule));
TaskDriver driver = new TaskDriver(_manager);
if (clonedWf != null) {
try {
// Start the cloned workflow
} catch (Exception e) {
LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e);
// Persist workflow start regardless of success to avoid retrying and failing
// Change the time to trigger the pipeline to that of the next run
_rebalanceScheduler.scheduleRebalance(_manager, workflow, (timeToSchedule + period));
} else {
// one time workflow.
// Remove any timers that are past-time for this workflowg
long scheduledTime = _rebalanceScheduler.getRebalanceTime(workflow);
if (scheduledTime > 0 && currentTime > scheduledTime) {
return true;
} else {
// set the timer to trigger future schedule
_rebalanceScheduler.scheduleRebalance(_manager, workflow, startTime.getTime());
return false;
* Create a new workflow based on an existing one
* @param manager connection to Helix
* @param origWorkflowName the name of the existing workflow
* @param newWorkflowName the name of the new workflow
* @param newStartTime a provided start time that deviates from the desired start time
* @return the cloned workflow, or null if there was a problem cloning the existing one
public static Workflow cloneWorkflow(HelixManager manager, String origWorkflowName,
String newWorkflowName, Date newStartTime) {
// Read all resources, including the workflow and jobs of interest
HelixDataAccessor accessor = manager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
Map<String, HelixProperty> resourceConfigMap =
if (!resourceConfigMap.containsKey(origWorkflowName)) {
LOG.error("No such workflow named " + origWorkflowName);
return null;
if (resourceConfigMap.containsKey(newWorkflowName)) {
LOG.error("Workflow with name " + newWorkflowName + " already exists!");
return null;
// Create a new workflow with a new name
Map<String, String> workflowConfigsMap =
WorkflowConfig.Builder workflowConfigBlder = WorkflowConfig.Builder.fromMap(workflowConfigsMap);
// Set the schedule, if applicable
if (newStartTime != null) {
ScheduleConfig scheduleConfig = ScheduleConfig.oneTimeDelayedStart(newStartTime);
WorkflowConfig workflowConfig =;
JobDag jobDag = workflowConfig.getJobDag();
Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
Workflow.Builder workflowBuilder = new Workflow.Builder(newWorkflowName);
// Add each job back as long as the original exists
Set<String> namespacedJobs = jobDag.getAllNodes();
for (String namespacedJob : namespacedJobs) {
if (resourceConfigMap.containsKey(namespacedJob)) {
// Copy over job-level and task-level configs
String job = TaskUtil.getDenamespacedJobName(origWorkflowName, namespacedJob);
HelixProperty jobConfig = resourceConfigMap.get(namespacedJob);
Map<String, String> jobSimpleFields = jobConfig.getRecord().getSimpleFields();
JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(jobSimpleFields);
jobCfgBuilder.setWorkflow(newWorkflowName); // overwrite workflow name
Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields();
List<TaskConfig> taskConfigs = Lists.newLinkedList();
for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
TaskConfig taskConfig = TaskConfig.Builder.from(rawTaskConfig);
workflowBuilder.addJob(job, jobCfgBuilder);
// Add dag dependencies
Set<String> children = parentsToChildren.get(namespacedJob);
if (children != null) {
for (String namespacedChild : children) {
String child = TaskUtil.getDenamespacedJobName(origWorkflowName, namespacedChild);
workflowBuilder.addParentChildDependency(job, child);
* Clean up a workflow. This removes the workflow config, idealstate, externalview and workflow
* contexts associated with this workflow, and all jobs information, including their configs,
* context, IS and EV.
private void cleanupWorkflow(String workflow) {"Cleaning up workflow: " + workflow);
WorkflowConfig workflowcfg = _clusterDataCache.getWorkflowConfig(workflow);
if (workflowcfg.isTerminable() || workflowcfg.getTargetState() == TargetState.DELETE) {
Set<String> jobs = workflowcfg.getJobDag().getAllNodes();
// Remove all pending timer tasks for this workflow if exists
for (String job : jobs) {
if (!TaskUtil.removeWorkflow(_manager.getHelixDataAccessor(),
_manager.getHelixPropertyStore(), workflow, jobs)) {
LOG.warn("Failed to clean up workflow " + workflow);
} else {
// Only remove from cache when remove all workflow success. Otherwise, batch write will
// clean all the contexts even if Configs and IdealStates are exists. Then all the workflows
// and jobs will rescheduled again.
removeContextsAndPreviousAssignment(workflow, jobs, _clusterDataCache.getTaskDataCache());
} else {"Did not clean up workflow " + workflow
+ " because neither the workflow is non-terminable nor is set to DELETE.");
private void removeContextsAndPreviousAssignment(String workflow, Set<String> jobs,
TaskDataCache cache) {
if (jobs != null) {
for (String job : jobs) {
private long computeStartTimeForJob(WorkflowContext workflowCtx, String job,
JobConfig jobConfig) {
// Since the start time is calculated base on the time of completion of parent jobs for this
// job, the calculated start time should only be calculated once. Persist the calculated time
// in WorkflowContext znode.
long calculatedStartTime = workflowCtx.getJobStartTime(job);
if (calculatedStartTime < 0) {
// Calculate the start time if it is not already calculated
calculatedStartTime = System.currentTimeMillis();
// If the start time is not calculated before, do the math.
if (jobConfig.getExecutionDelay() >= 0) {
calculatedStartTime += jobConfig.getExecutionDelay();
calculatedStartTime = Math.max(calculatedStartTime, jobConfig.getExecutionStart());
workflowCtx.setJobStartTime(job, calculatedStartTime);
return calculatedStartTime;