blob: e32f34c7e87e65fd3fa31f363f985a1638c84160 [file] [log] [blame]
package org.apache.helix.task;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import com.google.common.collect.Lists;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.common.caches.TaskDataCache;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WorkflowDispatcher extends AbstractTaskDispatcher {
private static final Logger LOG = LoggerFactory.getLogger(WorkflowDispatcher.class);
private WorkflowControllerDataProvider _clusterDataCache;
private JobDispatcher _jobDispatcher;
public void updateCache(WorkflowControllerDataProvider cache) {
_clusterDataCache = cache;
if (_jobDispatcher == null) {
_jobDispatcher = new JobDispatcher();
}
_jobDispatcher.init(_manager);
_jobDispatcher.updateCache(cache);
_jobDispatcher.setClusterStatusMonitor(_clusterStatusMonitor);
}
// Split it into status update and assign. But there are couple of data need
// to pass around.
public void updateWorkflowStatus(String workflow, WorkflowConfig workflowCfg,
WorkflowContext workflowCtx, CurrentStateOutput currentStateOutput,
BestPossibleStateOutput bestPossibleOutput) {
// Fetch workflow configuration and context
if (workflowCfg == null) {
LOG.warn("Workflow configuration is NULL for {}", workflow);
return;
}
// Step 1: Check for deletion - if so, we don't need to go through further steps
// Clean up if workflow marked for deletion
TargetState targetState = workflowCfg.getTargetState();
if (targetState == TargetState.DELETE) {
LOG.debug("Workflow is marked as deleted {} cleaning up the workflow context.", workflow);
updateInflightJobs(workflow, workflowCtx, currentStateOutput, bestPossibleOutput);
cleanupWorkflow(workflow);
return;
}
// Step 2: handle timeout, which should have higher priority than STOP
// Only generic workflow get timeouted and schedule rebalance for timeout. Will skip the set if
// the workflow already got timeouted. Job Queue will ignore the setup.
if (!workflowCfg.isJobQueue()
&& !TaskConstants.FINAL_STATES.contains(workflowCtx.getWorkflowState())) {
// If timeout point has already been passed, it will not be scheduled
scheduleRebalanceForTimeout(workflow, workflowCtx.getStartTime(), workflowCfg.getTimeout());
if (!TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState())
&& isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) {
workflowCtx.setWorkflowState(TaskState.TIMED_OUT);
_clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
}
// We should not return after setting timeout, as in case the workflow is stopped already
// marking it timeout will not trigger rebalance pipeline as we are not listening on
// PropertyStore change, nor will we schedule rebalance for timeout as at this point,
// workflow is already timed-out. We should let the code proceed and wait for schedule
// future cleanup work
}
long currentTime = System.currentTimeMillis();
// Step 3: Check and process finished workflow context (confusing,
// but its inside isWorkflowFinished())
// Check if workflow has been finished and mark it if it is. Also update cluster status
// monitor if provided
// Note that COMPLETE and FAILED will be marked in markJobComplete / markJobFailed
// This is to handle TIMED_OUT only
if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED && isWorkflowFinished(workflowCtx,
workflowCfg, _clusterDataCache.getJobConfigMap(), _clusterDataCache)) {
workflowCtx.setFinishTime(currentTime);
updateWorkflowMonitor(workflowCtx, workflowCfg);
_clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
}
// Step 4: Handle finished workflows
if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) {
LOG.debug("Workflow {} is finished.", workflow);
updateInflightJobs(workflow, workflowCtx, currentStateOutput, bestPossibleOutput);
long expiryTime = workflowCfg.getExpiry();
// Check if this workflow has been finished past its expiry.
if (workflowCtx.getFinishTime() + expiryTime <= currentTime) {
LOG.info("Workflow {} passed expiry time, cleaning up the workflow context.", workflow);
cleanupWorkflow(workflow);
} else {
// schedule future cleanup work
long cleanupTime = workflowCtx.getFinishTime() + expiryTime;
_rebalanceScheduler.scheduleRebalance(_manager, workflow, cleanupTime);
}
return;
}
if (!workflowCfg.isTerminable() || workflowCfg.isJobQueue()) {
Set<String> jobWithFinalStates = new HashSet<>(workflowCtx.getJobStates().keySet());
jobWithFinalStates.removeAll(workflowCfg.getJobDag().getAllNodes());
if (jobWithFinalStates.size() > 0) {
workflowCtx.setLastJobPurgeTime(System.currentTimeMillis());
workflowCtx.removeJobStates(jobWithFinalStates);
workflowCtx.removeJobStartTime(jobWithFinalStates);
}
}
updateInflightJobs(workflow, workflowCtx, currentStateOutput, bestPossibleOutput);
// Step 5: handle workflow that should STOP
// For workflows that have already reached final states, STOP should not take into effect.
if (!TaskConstants.FINAL_STATES.contains(workflowCtx.getWorkflowState())
&& TargetState.STOP.equals(targetState)) {
if (isWorkflowStopped(workflowCtx, workflowCfg) && workflowCtx.getWorkflowState() != TaskState.STOPPED) {
LOG.debug("Workflow {} is marked as stopped. Workflow state is {}", workflow,
workflowCtx.getWorkflowState());
workflowCtx.setWorkflowState(TaskState.STOPPED);
_clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
}
return;
}
// Step 6: handle workflow that should go to IN_PROGRESS state after is has been resumed
// This block is added to make sure workflow/queue context state becomes IN_PROGRESS for the
// case where a queue which has been stopped before is just resumed and the queue does not
// contain any job that needs to be run.
if (targetState.equals(TargetState.START)
&& workflowCtx.getWorkflowState() == TaskState.STOPPED) {
workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
}
_clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
}
private void updateInflightJobs(String workflow, WorkflowContext workflowCtx,
CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleOutput) {
// Update jobs already inflight
RuntimeJobDag runtimeJobDag = _clusterDataCache.getTaskDataCache().getRuntimeJobDag(workflow);
if (runtimeJobDag != null) {
for (String inflightJob : runtimeJobDag.getInflightJobList()) {
if (System.currentTimeMillis() >= workflowCtx.getJobStartTime(inflightJob)) {
processJob(inflightJob, currentStateOutput, bestPossibleOutput, workflowCtx);
}
}
} else {
LOG.warn(
"Failed to find runtime job DAG for workflow {}, existing runtime jobs may not be processed correctly for it",
workflow);
}
}
public void assignWorkflow(String workflow, WorkflowConfig workflowCfg,
WorkflowContext workflowCtx, CurrentStateOutput currentStateOutput,
BestPossibleStateOutput bestPossibleOutput) {
// Fetch workflow configuration and context
if (workflowCfg == null) {
// Already logged in status update.
return;
}
if (!isWorkflowReadyForSchedule(workflowCfg)) {
LOG.info("Workflow {} is not ready to schedule, schedule future rebalance at {}", workflow,
workflowCfg.getStartTime().getTime());
// set the timer to trigger future schedule
_rebalanceScheduler
.scheduleRebalance(_manager, workflow, workflowCfg.getStartTime().getTime());
return;
}
// Check for readiness, and stop processing if it's not ready
boolean isReady = scheduleWorkflowIfReady(workflow, workflowCfg, workflowCtx,
_clusterDataCache.getTaskDataCache());
if (isReady) {
// Schedule jobs from this workflow.
scheduleJobs(workflow, workflowCfg, workflowCtx, _clusterDataCache.getJobConfigMap(),
_clusterDataCache, currentStateOutput, bestPossibleOutput);
} else {
LOG.debug("Workflow {} is not ready to be scheduled.", workflow);
}
_clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
}
public WorkflowContext getOrInitializeWorkflowContext(String workflowName, TaskDataCache cache) {
WorkflowContext workflowCtx = cache.getWorkflowContext(workflowName);
if (workflowCtx == null) {
if (cache.getWorkflowConfig(workflowName) != null) {
workflowCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
workflowCtx.setStartTime(System.currentTimeMillis());
workflowCtx.setName(workflowName);
LOG.debug("Workflow context is created for " + workflowName);
} else {
// If config is null, do not initialize context
LOG.error("Workflow context is not created for {}. Workflow config is missing!",
workflowName);
}
}
return workflowCtx;
}
/**
* Figure out whether the jobs in the workflow should be run,
* and if it's ready, then just schedule it
*/
private void scheduleJobs(String workflow, WorkflowConfig workflowCfg,
WorkflowContext workflowCtx, Map<String, JobConfig> jobConfigMap,
WorkflowControllerDataProvider clusterDataCache, CurrentStateOutput currentStateOutput,
BestPossibleStateOutput bestPossibleOutput) {
ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
if (scheduleConfig != null && scheduleConfig.isRecurring()) {
LOG.debug("Jobs from recurring workflow {} are not schedule-able", workflow);
return;
}
int inCompleteAllJobCount = TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx);
int scheduledJobs = 0;
long timeToSchedule = Long.MAX_VALUE;
JobDag jobDag = clusterDataCache.getTaskDataCache().getRuntimeJobDag(workflow);
if (jobDag == null) {
jobDag = workflowCfg.getJobDag();
}
String nextJob = jobDag.getNextJob();
// Assign new jobs
while (nextJob != null) {
String job = nextJob;
TaskState jobState = workflowCtx.getJobState(job);
if (jobState != null && !jobState.equals(TaskState.NOT_STARTED)) {
LOG.debug("Job {} is already started or completed.", job);
processJob(job, currentStateOutput, bestPossibleOutput, workflowCtx);
nextJob = jobDag.getNextJob();
continue;
}
if (workflowCfg.isJobQueue() && scheduledJobs >= workflowCfg.getParallelJobs()) {
LOG.debug(
"Workflow {} already have enough job in progress, scheduledJobs(s)={}, stop scheduling more jobs",
workflow, scheduledJobs);
break;
}
// TODO: Part of isJobReadyToSchedule() is already done by RuntimeJobDag. Because there is
// some duplicate logic, consider refactoring. The check here and the ready-list in
// RuntimeJobDag may cause conflicts.
// check ancestor job status
if (isJobReadyToSchedule(job, workflowCfg, workflowCtx, inCompleteAllJobCount, jobConfigMap,
clusterDataCache, clusterDataCache.getAssignableInstanceManager())) {
JobConfig jobConfig = jobConfigMap.get(job);
long calculatedStartTime = computeStartTimeForJob(workflowCtx, job, jobConfig);
// Time is not ready. Set a trigger and update the start time.
// Check if the job is ready to be executed.
if (System.currentTimeMillis() >= workflowCtx.getJobStartTime(job)) {
workflowCtx.setJobState(job, TaskState.NOT_STARTED);
processJob(job, currentStateOutput, bestPossibleOutput, workflowCtx);
scheduledJobs++;
} else {
timeToSchedule = Math.min(timeToSchedule, calculatedStartTime);
}
}
nextJob = jobDag.getNextJob();
}
long currentScheduledTime =
_rebalanceScheduler.getRebalanceTime(workflow) == -1 ? Long.MAX_VALUE
: _rebalanceScheduler.getRebalanceTime(workflow);
if (timeToSchedule < currentScheduledTime) {
_rebalanceScheduler.scheduleRebalance(_manager, workflow, timeToSchedule);
}
}
private void processJob(String job, CurrentStateOutput currentStateOutput,
BestPossibleStateOutput bestPossibleOutput, WorkflowContext workflowCtx) {
_clusterDataCache.getTaskDataCache().dispatchJob(job);
try {
ResourceAssignment resourceAssignment =
_jobDispatcher.processJobStatusUpdateAndAssignment(job, currentStateOutput, workflowCtx);
updateBestPossibleStateOutput(job, resourceAssignment, bestPossibleOutput);
} catch (Exception e) {
LogUtil.logWarn(LOG, _clusterDataCache.getClusterEventId(),
String.format("Failed to compute job assignment for job %s", job), e);
}
}
/**
* Jobs that are missing corresponding JobConfigs or WorkflowConfigs or WorkflowContexts need to
* be dropped
*/
public void processJobForDrop(String resourceName, CurrentStateOutput currentStateOutput,
BestPossibleStateOutput bestPossibleStateOutput) {
JobConfig jobConfig = _clusterDataCache.getJobConfig(resourceName);
if (jobConfig == null || _clusterDataCache.getWorkflowConfig(jobConfig.getWorkflow()) == null
|| _clusterDataCache.getWorkflowContext(jobConfig.getWorkflow()) == null) {
ResourceAssignment emptyAssignment = buildEmptyAssignment(resourceName, currentStateOutput);
updateBestPossibleStateOutput(resourceName, emptyAssignment, bestPossibleStateOutput);
}
}
/**
* Check if a workflow is ready to schedule, and schedule a rebalance if it is not
* @param workflow the Helix resource associated with the workflow
* @param workflowCfg the workflow to check
* @param workflowCtx the current workflow context
* @return true if the workflow is ready for schedule, false if not ready
*/
private boolean scheduleWorkflowIfReady(String workflow, WorkflowConfig workflowCfg,
WorkflowContext workflowCtx, TaskDataCache cache) {
// non-scheduled workflow is ready to run immediately.
if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) {
return true;
}
// Figure out when this should be run, and if it's ready, then just run it
ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
Date startTime = scheduleConfig.getStartTime();
long currentTime = new Date().getTime();
long delayFromStart = startTime.getTime() - currentTime;
if (delayFromStart <= 0) {
// Recurring workflows are just templates that spawn new workflows
if (scheduleConfig.isRecurring()) {
// Skip scheduling this workflow if it's not in a start state
if (!workflowCfg.getTargetState().equals(TargetState.START)) {
LOG.debug("Skip scheduling since the workflow {} has not been started", workflow);
return false;
}
// Skip scheduling this workflow again if the previous run (if any) is still active
String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow();
if (lastScheduled != null) {
WorkflowContext lastWorkflowCtx = cache.getWorkflowContext(lastScheduled);
if (lastWorkflowCtx != null
&& lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
LOG.info("Skip scheduling workflow {} since last schedule {} has not completed yet.",
workflow, lastScheduled);
return false;
}
}
// Figure out how many jumps are needed, thus the time to schedule the next workflow
// The negative of the delay is the amount of time past the start time
long period =
scheduleConfig.getRecurrenceUnit().toMillis(scheduleConfig.getRecurrenceInterval());
long offsetMultiplier = (-delayFromStart) / period;
long timeToSchedule = period * offsetMultiplier + startTime.getTime();
// Now clone the workflow if this clone has not yet been created
DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss");
df.setTimeZone(TimeZone.getTimeZone("UTC"));
String newWorkflowName = workflow + "_" + df.format(new Date(timeToSchedule));
LOG.debug("Ready to start workflow {}", newWorkflowName);
if (!newWorkflowName.equals(lastScheduled)) {
Workflow clonedWf =
cloneWorkflow(_manager, workflow, newWorkflowName, new Date(timeToSchedule));
TaskDriver driver = new TaskDriver(_manager);
if (clonedWf != null) {
try {
// Start the cloned workflow
driver.start(clonedWf);
} catch (Exception e) {
LOG.error("Failed to schedule cloned workflow {}. ", newWorkflowName, e);
_clusterStatusMonitor
.updateWorkflowCounters(clonedWf.getWorkflowConfig(), TaskState.FAILED);
}
}
// Persist workflow start regardless of success to avoid retrying and failing
workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
}
// Change the time to trigger the pipeline to that of the next run
_rebalanceScheduler.scheduleRebalance(_manager, workflow, (timeToSchedule + period));
} else {
// one time workflow.
// Remove any timers that are past-time for this workflowg
long scheduledTime = _rebalanceScheduler.getRebalanceTime(workflow);
if (scheduledTime > 0 && currentTime > scheduledTime) {
_rebalanceScheduler.removeScheduledRebalance(workflow);
}
return true;
}
} else {
// set the timer to trigger future schedule
_rebalanceScheduler.scheduleRebalance(_manager, workflow, startTime.getTime());
}
return false;
}
/**
* Create a new workflow based on an existing one
* @param manager connection to Helix
* @param origWorkflowName the name of the existing workflow
* @param newWorkflowName the name of the new workflow
* @param newStartTime a provided start time that deviates from the desired start time
* @return the cloned workflow, or null if there was a problem cloning the existing one
*/
public static Workflow cloneWorkflow(HelixManager manager, String origWorkflowName,
String newWorkflowName, Date newStartTime) {
// Read all resources, including the workflow and jobs of interest
HelixDataAccessor accessor = manager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
Map<String, HelixProperty> resourceConfigMap =
accessor.getChildValuesMap(keyBuilder.resourceConfigs(), true);
if (!resourceConfigMap.containsKey(origWorkflowName)) {
LOG.error("No such workflow named {}", origWorkflowName);
return null;
}
if (resourceConfigMap.containsKey(newWorkflowName)) {
LOG.error("Workflow with name {} already exists!", newWorkflowName);
return null;
}
// Create a new workflow with a new name
Map<String, String> workflowConfigsMap =
resourceConfigMap.get(origWorkflowName).getRecord().getSimpleFields();
WorkflowConfig.Builder workflowConfigBlder = WorkflowConfig.Builder.fromMap(workflowConfigsMap);
// Set the schedule, if applicable
if (newStartTime != null) {
ScheduleConfig scheduleConfig = ScheduleConfig.oneTimeDelayedStart(newStartTime);
workflowConfigBlder.setScheduleConfig(scheduleConfig);
}
workflowConfigBlder.setTerminable(true);
WorkflowConfig workflowConfig = workflowConfigBlder.build();
JobDag jobDag = workflowConfig.getJobDag();
Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
Workflow.Builder workflowBuilder = new Workflow.Builder(newWorkflowName);
workflowBuilder.setWorkflowConfig(workflowConfig);
// Add each job back as long as the original exists
Set<String> namespacedJobs = jobDag.getAllNodes();
for (String namespacedJob : namespacedJobs) {
if (resourceConfigMap.containsKey(namespacedJob)) {
// Copy over job-level and task-level configs
String job = TaskUtil.getDenamespacedJobName(origWorkflowName, namespacedJob);
HelixProperty jobConfig = resourceConfigMap.get(namespacedJob);
Map<String, String> jobSimpleFields = jobConfig.getRecord().getSimpleFields();
JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(jobSimpleFields);
jobCfgBuilder.setWorkflow(newWorkflowName); // overwrite workflow name
Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields();
List<TaskConfig> taskConfigs = Lists.newLinkedList();
for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
TaskConfig taskConfig = TaskConfig.Builder.from(rawTaskConfig);
taskConfigs.add(taskConfig);
}
jobCfgBuilder.addTaskConfigs(taskConfigs);
workflowBuilder.addJob(job, jobCfgBuilder);
// Add dag dependencies
Set<String> children = parentsToChildren.get(namespacedJob);
if (children != null) {
for (String namespacedChild : children) {
String child = TaskUtil.getDenamespacedJobName(origWorkflowName, namespacedChild);
workflowBuilder.addParentChildDependency(job, child);
}
}
}
}
return workflowBuilder.build();
}
/**
* Clean up a workflow. This removes the workflow config, idealstate, externalview and workflow
* contexts associated with this workflow, and all jobs information, including their configs,
* context, IS and EV.
*/
private void cleanupWorkflow(String workflow) {
LOG.info("Cleaning up workflow: " + workflow);
WorkflowConfig workflowcfg = _clusterDataCache.getWorkflowConfig(workflow);
if (workflowcfg.isTerminable() || workflowcfg.getTargetState() == TargetState.DELETE) {
Set<String> jobs = workflowcfg.getJobDag().getAllNodes();
// Remove all pending timer tasks for this workflow if exists
_rebalanceScheduler.removeScheduledRebalance(workflow);
for (String job : jobs) {
_rebalanceScheduler.removeScheduledRebalance(job);
}
if (!TaskUtil.removeWorkflow(_manager.getHelixDataAccessor(),
_manager.getHelixPropertyStore(), workflow, jobs)) {
LOG.warn("Failed to clean up workflow {}", workflow);
} else {
// Only remove from cache when remove all workflow success. Otherwise, batch write will
// clean all the contexts even if Configs and IdealStates are exists. Then all the workflows
// and jobs will rescheduled again.
removeContexts(workflow, jobs, _clusterDataCache.getTaskDataCache());
}
} else {
LOG.info(
"Did not clean up workflow {} because neither the workflow is non-terminable nor is set to DELETE.",
workflow);
}
}
private void removeContexts(String workflow, Set<String> jobs,
TaskDataCache cache) {
if (jobs != null) {
for (String job : jobs) {
cache.removeContext(job);
}
}
cache.removeContext(workflow);
}
private long computeStartTimeForJob(WorkflowContext workflowCtx, String job,
JobConfig jobConfig) {
// Since the start time is calculated base on the time of completion of parent jobs for this
// job, the calculated start time should only be calculated once. Persist the calculated time
// in WorkflowContext znode.
long calculatedStartTime = workflowCtx.getJobStartTime(job);
if (calculatedStartTime < 0) {
// Calculate the start time if it is not already calculated
calculatedStartTime = System.currentTimeMillis();
// If the start time is not calculated before, do the math.
if (jobConfig.getExecutionDelay() >= 0) {
calculatedStartTime += jobConfig.getExecutionDelay();
}
calculatedStartTime = Math.max(calculatedStartTime, jobConfig.getExecutionStart());
workflowCtx.setJobStartTime(job, calculatedStartTime);
}
return calculatedStartTime;
}
}