blob: 74a8c25adba67c1997f75b57cc48a1c843e53f1e [file] [log] [blame]
package org.apache.helix.task;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import com.google.common.base.Joiner;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.DataUpdater;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.rebalancer.Rebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Custom rebalancer implementation for the {@code Task} state model.
*/
/** This rebalancer is deprecated, left here only for back-compatible. **/
@Deprecated
public abstract class DeprecatedTaskRebalancer
implements Rebalancer<WorkflowControllerDataProvider>,
MappingCalculator<WorkflowControllerDataProvider> {
private static final Logger LOG = LoggerFactory.getLogger(TaskRebalancer.class);
// Management of already-scheduled rebalances across jobs
private static final BiMap<String, Date> SCHEDULED_TIMES = HashBiMap.create();
private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors
.newSingleThreadScheduledExecutor();
public static final String PREV_RA_NODE = "PreviousResourceAssignment";
// For connection management
private HelixManager _manager;
/**
* Get all the partitions that should be created by this task
* @param jobCfg the task configuration
* @param jobCtx the task context
* @param workflowCfg the workflow configuration
* @param workflowCtx the workflow context
* @param cache cluster snapshot
* @return set of partition numbers
*/
public abstract Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
WorkflowConfig workflowCfg, WorkflowContext workflowCtx, WorkflowControllerDataProvider cache);
/**
* Compute an assignment of tasks to instances
* @param currStateOutput the current state of the instances
* @param prevAssignment the previous task partition assignment
* @param instances the instances
* @param jobCfg the task configuration
* @param jobContext the task context
* @param workflowCfg the workflow configuration
* @param workflowCtx the workflow context
* @param partitionSet the partitions to assign
* @param cache cluster snapshot
* @return map of instances to set of partition numbers
*/
public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
WorkflowControllerDataProvider cache);
@Override
public void init(HelixManager manager) {
_manager = manager;
}
@Override
public ResourceAssignment computeBestPossiblePartitionState(WorkflowControllerDataProvider clusterData,
IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
final String resourceName = resource.getResourceName();
LOG.debug("Computer Best Partition for resource: " + resourceName);
// Fetch job configuration
JobConfig jobCfg = (JobConfig) clusterData.getResourceConfig(resourceName);
if (jobCfg == null) {
LOG.debug("Job configuration is NULL for " + resourceName);
return emptyAssignment(resourceName, currStateOutput);
}
String workflowResource = jobCfg.getWorkflow();
// Fetch workflow configuration and context
WorkflowConfig workflowCfg = clusterData.getWorkflowConfig(workflowResource);
if (workflowCfg == null) {
LOG.debug("Workflow configuration is NULL for " + resourceName);
return emptyAssignment(resourceName, currStateOutput);
}
WorkflowContext workflowCtx = clusterData.getWorkflowContext(workflowResource);
// Initialize workflow context if needed
if (workflowCtx == null) {
workflowCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
workflowCtx.setStartTime(System.currentTimeMillis());
workflowCtx.setName(workflowResource);
LOG.info("Workflow context for " + resourceName + " created!");
}
// check ancestor job status
int notStartedCount = 0;
int inCompleteCount = 0;
for (String ancestor : workflowCfg.getJobDag().getAncestors(resourceName)) {
TaskState jobState = workflowCtx.getJobState(ancestor);
if (jobState == null || jobState == TaskState.NOT_STARTED) {
++notStartedCount;
} else if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) {
++inCompleteCount;
}
}
if (notStartedCount > 0 || (workflowCfg.isJobQueue() && inCompleteCount >= workflowCfg
.getParallelJobs())) {
LOG.debug("Job is not ready to be scheduled due to pending dependent jobs " + resourceName);
return emptyAssignment(resourceName, currStateOutput);
}
// Clean up if workflow marked for deletion
TargetState targetState = workflowCfg.getTargetState();
if (targetState == TargetState.DELETE) {
LOG.info(
"Workflow is marked as deleted " + workflowResource
+ " cleaning up the workflow context.");
cleanup(_manager, resourceName, workflowCfg, workflowResource);
return emptyAssignment(resourceName, currStateOutput);
}
// Check if this workflow has been finished past its expiry.
if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED
&& workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
LOG.info("Workflow " + workflowResource
+ " is completed and passed expiry time, cleaning up the workflow context.");
markForDeletion(_manager, workflowResource);
cleanup(_manager, resourceName, workflowCfg, workflowResource);
return emptyAssignment(resourceName, currStateOutput);
}
// Fetch any existing context information from the property store.
JobContext jobCtx = clusterData.getJobContext(resourceName);
if (jobCtx == null) {
jobCtx = new JobContext(new ZNRecord(TaskUtil.TASK_CONTEXT_KW));
jobCtx.setStartTime(System.currentTimeMillis());
jobCtx.setName(resourceName);
}
// Check for expired jobs for non-terminable workflows
long jobFinishTime = jobCtx.getFinishTime();
if (!workflowCfg.isTerminable() && jobFinishTime != WorkflowContext.UNFINISHED
&& jobFinishTime + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
LOG.info("Job " + resourceName
+ " is completed and passed expiry time, cleaning up the job context.");
cleanup(_manager, resourceName, workflowCfg, workflowResource);
return emptyAssignment(resourceName, currStateOutput);
}
// The job is already in a final state (completed/failed).
if (workflowCtx.getJobState(resourceName) == TaskState.FAILED
|| workflowCtx.getJobState(resourceName) == TaskState.COMPLETED) {
LOG.debug("Job " + resourceName + " is failed or already completed.");
return emptyAssignment(resourceName, currStateOutput);
}
// Check for readiness, and stop processing if it's not ready
boolean isReady =
scheduleIfNotReady(workflowCfg, workflowCtx, workflowResource, resourceName, clusterData);
if (!isReady) {
LOG.debug("Job " + resourceName + " is not ready to be scheduled.");
return emptyAssignment(resourceName, currStateOutput);
}
// Grab the old assignment, or an empty one if it doesn't exist
ResourceAssignment prevAssignment = getPrevResourceAssignment(_manager, resourceName);
if (prevAssignment == null) {
prevAssignment = new ResourceAssignment(resourceName);
}
// Will contain the list of partitions that must be explicitly dropped from the ideal state that
// is stored in zk.
// Fetch the previous resource assignment from the property store. This is required because of
// HELIX-230.
Set<Integer> partitionsToDrop = new TreeSet<Integer>();
ResourceAssignment newAssignment =
computeResourceMapping(resourceName, workflowCfg, jobCfg, prevAssignment, clusterData
.getLiveInstances().keySet(), currStateOutput, workflowCtx, jobCtx, partitionsToDrop,
clusterData);
if (!partitionsToDrop.isEmpty()) {
for (Integer pId : partitionsToDrop) {
taskIs.getRecord().getMapFields().remove(pName(resourceName, pId));
}
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
PropertyKey propertyKey = accessor.keyBuilder().idealStates(resourceName);
accessor.setProperty(propertyKey, taskIs);
}
// Update Workflow and Job context in data cache and ZK.
clusterData.updateJobContext(resourceName, jobCtx);
clusterData
.updateWorkflowContext(workflowResource, workflowCtx);
setPrevResourceAssignment(_manager, resourceName, newAssignment);
LOG.debug("Job " + resourceName + " new assignment " + Arrays
.toString(newAssignment.getMappedPartitions().toArray()));
return newAssignment;
}
/**
* Get the last task assignment for a given job
* @param manager a connection to Helix
* @param resourceName the name of the job
* @return {@link ResourceAssignment} instance, or null if no assignment is available
*/
private ResourceAssignment getPrevResourceAssignment(HelixManager manager,
String resourceName) {
ZNRecord r =
manager.getHelixPropertyStore().get(
Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
null, AccessOption.PERSISTENT);
return r != null ? new ResourceAssignment(r) : null;
}
/**
* Set the last task assignment for a given job
* @param manager a connection to Helix
* @param resourceName the name of the job
* @param ra {@link ResourceAssignment} containing the task assignment
*/
public void setPrevResourceAssignment(HelixManager manager, String resourceName,
ResourceAssignment ra) {
manager.getHelixPropertyStore().set(
Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
ra.getRecord(), AccessOption.PERSISTENT);
}
private Set<String> getInstancesAssignedToOtherJobs(String currentJobName,
WorkflowConfig workflowCfg, WorkflowControllerDataProvider cache) {
Set<String> ret = new HashSet<String>();
for (String jobName : workflowCfg.getJobDag().getAllNodes()) {
if (jobName.equals(currentJobName)) {
continue;
}
JobContext jobContext = cache.getJobContext(jobName);
if (jobContext == null) {
continue;
}
for (int partition : jobContext.getPartitionSet()) {
TaskPartitionState partitionState = jobContext.getPartitionState(partition);
if (partitionState == TaskPartitionState.INIT ||
partitionState == TaskPartitionState.RUNNING) {
ret.add(jobContext.getAssignedParticipant(partition));
}
}
}
return ret;
}
private ResourceAssignment computeResourceMapping(String jobResource,
WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment prevAssignment,
Collection<String> liveInstances, CurrentStateOutput currStateOutput,
WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs,
WorkflowControllerDataProvider cache) {
TargetState jobTgtState = workflowConfig.getTargetState();
// Update running status in workflow context
if (jobTgtState == TargetState.STOP) {
workflowCtx.setJobState(jobResource, TaskState.STOPPED);
// Workflow has been stopped if all jobs are stopped
if (isWorkflowStopped(workflowCtx, workflowConfig)) {
workflowCtx.setWorkflowState(TaskState.STOPPED);
}
} else {
workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS);
// Workflow is in progress if any task is in progress
workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
}
// Used to keep track of tasks that have already been assigned to instances.
Set<Integer> assignedPartitions = new HashSet<Integer>();
// Used to keep track of tasks that have failed, but whose failure is acceptable
Set<Integer> skippedPartitions = new HashSet<Integer>();
// Keeps a mapping of (partition) -> (instance, state)
Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>();
Set<String> excludedInstances =
getInstancesAssignedToOtherJobs(jobResource, workflowConfig, cache);
// Process all the current assignments of tasks.
Set<Integer> allPartitions =
getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache);
Map<String, SortedSet<Integer>> taskAssignments =
getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
long currentTime = System.currentTimeMillis();
for (String instance : taskAssignments.keySet()) {
if (excludedInstances.contains(instance)) {
continue;
}
Set<Integer> pSet = taskAssignments.get(instance);
// Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT,
// TASK_ERROR, ERROR.
Set<Integer> donePartitions = new TreeSet<Integer>();
for (int pId : pSet) {
final String pName = pName(jobResource, pId);
// Check for pending state transitions on this (partition, instance).
Message pendingMessage =
currStateOutput.getPendingMessage(jobResource, new Partition(pName), instance);
if (pendingMessage != null) {
// There is a pending state transition for this (partition, instance). Just copy forward
// the state assignment from the previous ideal state.
Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName));
if (stateMap != null) {
String prevState = stateMap.get(instance);
paMap.put(pId, new PartitionAssignment(instance, prevState));
assignedPartitions.add(pId);
if (LOG.isDebugEnabled()) {
LOG.debug(String
.format(
"Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.",
pName, instance, prevState));
}
}
continue;
}
TaskPartitionState currState =
TaskPartitionState.valueOf(currStateOutput.getCurrentState(jobResource, new Partition(
pName), instance));
jobCtx.setPartitionState(pId, currState);
// Process any requested state transitions.
String requestedStateStr =
currStateOutput.getRequestedState(jobResource, new Partition(pName), instance);
if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
if (requestedState.equals(currState)) {
LOG.warn(String.format(
"Requested state %s is the same as the current state for instance %s.",
requestedState, instance));
}
paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
assignedPartitions.add(pId);
LOG.debug(String.format(
"Instance %s requested a state transition to %s for partition %s.", instance,
requestedState, pName));
continue;
}
switch (currState) {
case RUNNING:
case STOPPED: {
TaskPartitionState nextState;
if (jobTgtState == TargetState.START) {
nextState = TaskPartitionState.RUNNING;
} else {
nextState = TaskPartitionState.STOPPED;
}
paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
assignedPartitions.add(pId);
LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
nextState, instance));
}
break;
case COMPLETED: {
// The task has completed on this partition. Mark as such in the context object.
donePartitions.add(pId);
LOG.debug(String
.format(
"Task partition %s has completed with state %s. Marking as such in rebalancer context.",
pName, currState));
partitionsToDropFromIs.add(pId);
markPartitionCompleted(jobCtx, pId);
}
break;
case TIMED_OUT:
case TASK_ERROR:
case ERROR: {
donePartitions.add(pId); // The task may be rescheduled on a different instance.
LOG.debug(String.format(
"Task partition %s has error state %s. Marking as such in rebalancer context.",
pName, currState));
markPartitionError(jobCtx, pId, currState, true);
// The error policy is to fail the task as soon a single partition fails for a specified
// maximum number of attempts.
if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()) {
// If the user does not require this task to succeed in order for the job to succeed,
// then we don't have to fail the job right now
boolean successOptional = false;
String taskId = jobCtx.getTaskIdForPartition(pId);
if (taskId != null) {
TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
if (taskConfig != null) {
successOptional = taskConfig.isSuccessOptional();
}
}
// Similarly, if we have some leeway for how many tasks we can fail, then we don't have
// to fail the job immediately
if (skippedPartitions.size() < jobCfg.getFailureThreshold()) {
successOptional = true;
}
if (!successOptional) {
long finishTime = currentTime;
workflowCtx.setJobState(jobResource, TaskState.FAILED);
if (workflowConfig.isTerminable()) {
workflowCtx.setWorkflowState(TaskState.FAILED);
workflowCtx.setFinishTime(finishTime);
}
jobCtx.setFinishTime(finishTime);
markAllPartitionsError(jobCtx, currState, false);
addAllPartitions(allPartitions, partitionsToDropFromIs);
return emptyAssignment(jobResource, currStateOutput);
} else {
skippedPartitions.add(pId);
partitionsToDropFromIs.add(pId);
}
} else {
// Mark the task to be started at some later time (if enabled)
markPartitionDelayed(jobCfg, jobCtx, pId);
}
}
break;
case INIT:
case DROPPED: {
// currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
donePartitions.add(pId);
LOG.debug(String.format(
"Task partition %s has state %s. It will be dropped from the current ideal state.",
pName, currState));
}
break;
default:
throw new AssertionError("Unknown enum symbol: " + currState);
}
}
// Remove the set of task partitions that are completed or in one of the error states.
pSet.removeAll(donePartitions);
}
// For delayed tasks, trigger a rebalance event for the closest upcoming ready time
scheduleForNextTask(jobResource, jobCtx, currentTime);
if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) {
workflowCtx.setJobState(jobResource, TaskState.COMPLETED);
jobCtx.setFinishTime(currentTime);
if (isWorkflowComplete(workflowCtx, workflowConfig)) {
workflowCtx.setWorkflowState(TaskState.COMPLETED);
workflowCtx.setFinishTime(currentTime);
}
}
// Make additional task assignments if needed.
if (jobTgtState == TargetState.START) {
// Contains the set of task partitions that must be excluded from consideration when making
// any new assignments.
// This includes all completed, failed, delayed, and already assigned partitions.
Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
addCompletedPartitions(excludeSet, jobCtx, allPartitions);
addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg);
excludeSet.addAll(skippedPartitions);
excludeSet.addAll(getNonReadyPartitions(jobCtx, currentTime));
// Get instance->[partition, ...] mappings for the target resource.
Map<String, SortedSet<Integer>> tgtPartitionAssignments =
getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx,
workflowConfig, workflowCtx, allPartitions, cache);
for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
String instance = entry.getKey();
if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances.contains(instance)) {
continue;
}
// Contains the set of task partitions currently assigned to the instance.
Set<Integer> pSet = entry.getValue();
int numToAssign = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size();
if (numToAssign > 0) {
List<Integer> nextPartitions =
getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign);
for (Integer pId : nextPartitions) {
String pName = pName(jobResource, pId);
paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
excludeSet.add(pId);
jobCtx.setAssignedParticipant(pId, instance);
jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
TaskPartitionState.RUNNING, instance));
}
}
}
}
// Construct a ResourceAssignment object from the map of partition assignments.
ResourceAssignment ra = new ResourceAssignment(jobResource);
for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
PartitionAssignment pa = e.getValue();
ra.addReplicaMap(new Partition(pName(jobResource, e.getKey())),
ImmutableMap.of(pa._instance, pa._state));
}
return ra;
}
/**
* Check if a workflow is ready to schedule, and schedule a rebalance if it is not
* @param workflowCfg the workflow to check
* @param workflowCtx the current workflow context
* @param workflowResource the Helix resource associated with the workflow
* @param jobResource a job from the workflow
* @param cache the current snapshot of the cluster
* @return true if ready, false if not ready
*/
private boolean scheduleIfNotReady(WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
String workflowResource, String jobResource, WorkflowControllerDataProvider cache) {
// Ignore non-scheduled workflows
if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) {
return true;
}
// Figure out when this should be run, and if it's ready, then just run it
ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig();
Date startTime = scheduleConfig.getStartTime();
long currentTime = new Date().getTime();
long delayFromStart = startTime.getTime() - currentTime;
if (delayFromStart <= 0) {
// Remove any timers that are past-time for this workflow
Date scheduledTime = SCHEDULED_TIMES.get(workflowResource);
if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
LOG.debug("Remove schedule timer for " + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource));
SCHEDULED_TIMES.remove(workflowResource);
}
// Recurring workflows are just templates that spawn new workflows
if (scheduleConfig.isRecurring()) {
// Skip scheduling this workflow if it's not in a start state
if (!workflowCfg.getTargetState().equals(TargetState.START)) {
LOG.debug(
"Skip scheduling since the workflow has not been started " + workflowResource);
return false;
}
// Skip scheduling this workflow again if the previous run (if any) is still active
String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow();
if (lastScheduled != null) {
WorkflowContext lastWorkflowCtx = cache.getWorkflowContext(lastScheduled);
if (lastWorkflowCtx != null
&& lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
LOG.info("Skip scheduling since last schedule has not completed yet " + lastScheduled);
return false;
}
}
// Figure out how many jumps are needed, thus the time to schedule the next workflow
// The negative of the delay is the amount of time past the start time
long period =
scheduleConfig.getRecurrenceUnit().toMillis(scheduleConfig.getRecurrenceInterval());
long offsetMultiplier = (-delayFromStart) / period;
long timeToSchedule = period * offsetMultiplier + startTime.getTime();
// Now clone the workflow if this clone has not yet been created
DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmssZ");
// Now clone the workflow if this clone has not yet been created
String newWorkflowName = workflowResource + "_" + df.format(new java.util.Date(timeToSchedule));
LOG.debug("Ready to start workflow " + newWorkflowName);
if (!newWorkflowName.equals(lastScheduled)) {
Workflow clonedWf =
cloneWorkflow(_manager, workflowResource, newWorkflowName, new Date(timeToSchedule));
TaskDriver driver = new TaskDriver(_manager);
try {
// Start the cloned workflow
driver.start(clonedWf);
} catch (Exception e) {
LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e);
}
// Persist workflow start regardless of success to avoid retrying and failing
workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
cache.updateWorkflowContext(workflowResource, workflowCtx);
}
// Change the time to trigger the pipeline to that of the next run
startTime = new Date(timeToSchedule + period);
delayFromStart = startTime.getTime() - System.currentTimeMillis();
} else {
// This is a one-time workflow and is ready
return true;
}
}
scheduleRebalance(workflowResource, jobResource, startTime, delayFromStart);
return false;
}
/**
* Create a new workflow based on an existing one
* @param manager connection to Helix
* @param origWorkflowName the name of the existing workflow
* @param newWorkflowName the name of the new workflow
* @param newStartTime a provided start time that deviates from the desired start time
* @return the cloned workflow, or null if there was a problem cloning the existing one
*/
private Workflow cloneWorkflow(HelixManager manager, String origWorkflowName,
String newWorkflowName, Date newStartTime) {
// Read all resources, including the workflow and jobs of interest
HelixDataAccessor accessor = manager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
Map<String, HelixProperty> resourceConfigMap =
accessor.getChildValuesMap(keyBuilder.resourceConfigs());
if (!resourceConfigMap.containsKey(origWorkflowName)) {
LOG.error("No such workflow named " + origWorkflowName);
return null;
}
if (resourceConfigMap.containsKey(newWorkflowName)) {
LOG.error("Workflow with name " + newWorkflowName + " already exists!");
return null;
}
// Create a new workflow with a new name
HelixProperty workflowConfig = resourceConfigMap.get(origWorkflowName);
Map<String, String> wfSimpleFields = workflowConfig.getRecord().getSimpleFields();
JobDag jobDag =
JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
Workflow.Builder builder = new Workflow.Builder(newWorkflowName);
// Set the workflow expiry
builder.setExpiry(
Long.parseLong(wfSimpleFields.get(WorkflowConfig.WorkflowConfigProperty.Expiry.name())));
// Set the schedule, if applicable
ScheduleConfig scheduleConfig;
if (newStartTime != null) {
scheduleConfig = ScheduleConfig.oneTimeDelayedStart(newStartTime);
} else {
scheduleConfig = WorkflowConfig.parseScheduleFromConfigMap(wfSimpleFields);
}
if (scheduleConfig != null) {
builder.setScheduleConfig(scheduleConfig);
}
// Add each job back as long as the original exists
Set<String> namespacedJobs = jobDag.getAllNodes();
for (String namespacedJob : namespacedJobs) {
if (resourceConfigMap.containsKey(namespacedJob)) {
// Copy over job-level and task-level configs
String job = TaskUtil.getDenamespacedJobName(origWorkflowName, namespacedJob);
HelixProperty jobConfig = resourceConfigMap.get(namespacedJob);
Map<String, String> jobSimpleFields = jobConfig.getRecord().getSimpleFields();
jobSimpleFields.put(JobConfig.JobConfigProperty.WorkflowID.name(), newWorkflowName); // overwrite workflow name
for (Map.Entry<String, String> e : jobSimpleFields.entrySet()) {
builder.addConfig(job, e.getKey(), e.getValue());
}
Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields();
List<TaskConfig> taskConfigs = Lists.newLinkedList();
for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
TaskConfig taskConfig = TaskConfig.Builder.from(rawTaskConfig);
taskConfigs.add(taskConfig);
}
builder.addTaskConfigs(job, taskConfigs);
// Add dag dependencies
Set<String> children = parentsToChildren.get(namespacedJob);
if (children != null) {
for (String namespacedChild : children) {
String child = TaskUtil.getDenamespacedJobName(origWorkflowName, namespacedChild);
builder.addParentChildDependency(job, child);
}
}
}
}
return builder.build();
}
private void scheduleRebalance(String id, String jobResource, Date startTime, long delayFromStart) {
// Do nothing if there is already a timer set for the this workflow with the same start time.
if ((SCHEDULED_TIMES.containsKey(id) && SCHEDULED_TIMES.get(id).equals(startTime))
|| SCHEDULED_TIMES.inverse().containsKey(startTime)) {
LOG.debug("Schedule timer for" + id + "and job: " + jobResource + " is up to date.");
return;
}
LOG.info(
"Schedule rebalance with id: " + id + "and job: " + jobResource + " at time: " + startTime
+ " delay from start: " + delayFromStart);
// For workflows not yet scheduled, schedule them and record it
RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(_manager, jobResource);
SCHEDULED_TIMES.put(id, startTime);
SCHEDULED_EXECUTOR.schedule(rebalanceInvoker, delayFromStart, TimeUnit.MILLISECONDS);
}
private void scheduleForNextTask(String jobResource, JobContext ctx, long now) {
// Clear current entries if they exist and are expired
long currentTime = now;
Date scheduledTime = SCHEDULED_TIMES.get(jobResource);
if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
LOG.debug(
"Remove schedule timer for" + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource));
SCHEDULED_TIMES.remove(jobResource);
}
// Figure out the earliest schedulable time in the future of a non-complete job
boolean shouldSchedule = false;
long earliestTime = Long.MAX_VALUE;
for (int p : ctx.getPartitionSet()) {
long retryTime = ctx.getNextRetryTime(p);
TaskPartitionState state = ctx.getPartitionState(p);
state = (state != null) ? state : TaskPartitionState.INIT;
Set<TaskPartitionState> errorStates =
Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.TASK_ERROR,
TaskPartitionState.TIMED_OUT);
if (errorStates.contains(state) && retryTime > currentTime && retryTime < earliestTime) {
earliestTime = retryTime;
shouldSchedule = true;
}
}
// If any was found, then schedule it
if (shouldSchedule) {
long delay = earliestTime - currentTime;
Date startTime = new Date(earliestTime);
scheduleRebalance(jobResource, jobResource, startTime, delay);
}
}
/**
* Checks if the job has completed.
* @param ctx The rebalancer context.
* @param allPartitions The set of partitions to check.
* @param skippedPartitions partitions that failed, but whose failure is acceptable
* @return true if all task partitions have been marked with status
* {@link TaskPartitionState#COMPLETED} in the rebalancer
* context, false otherwise.
*/
private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions,
Set<Integer> skippedPartitions, JobConfig cfg) {
for (Integer pId : allPartitions) {
TaskPartitionState state = ctx.getPartitionState(pId);
if (!skippedPartitions.contains(pId) && state != TaskPartitionState.COMPLETED
&& !isTaskGivenup(ctx, cfg, pId)) {
return false;
}
}
return true;
}
/**
* Checks if the workflow has completed.
* @param ctx Workflow context containing job states
* @param cfg Workflow config containing set of jobs
* @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise.
*/
private static boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) {
if (!cfg.isTerminable()) {
return false;
}
for (String job : cfg.getJobDag().getAllNodes()) {
if (ctx.getJobState(job) != TaskState.COMPLETED) {
return false;
}
}
return true;
}
/**
* Checks if the workflow has been stopped.
* @param ctx Workflow context containing task states
* @param cfg Workflow config containing set of tasks
* @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise.
*/
private static boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
for (String job : cfg.getJobDag().getAllNodes()) {
if (ctx.getJobState(job) != TaskState.STOPPED && ctx.getJobState(job) != null) {
return false;
}
}
return true;
}
private static void markForDeletion(HelixManager mgr, String resourceName) {
mgr.getConfigAccessor().set(
TaskUtil.getResourceConfigScope(mgr.getClusterName(), resourceName),
WorkflowConfig.WorkflowConfigProperty.TargetState.name(), TargetState.DELETE.name());
}
/**
* Cleans up all Helix state associated with this job, wiping workflow-level information if this
* is the last remaining job in its workflow, and the workflow is terminable.
*/
private static void cleanup(HelixManager mgr, final String resourceName, WorkflowConfig cfg,
String workflowResource) {
LOG.info("Cleaning up job: " + resourceName + " in workflow: " + workflowResource);
HelixDataAccessor accessor = mgr.getHelixDataAccessor();
// Remove any DAG references in workflow
PropertyKey workflowKey = getConfigPropertyKey(accessor, workflowResource);
DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
JobDag jobDag = JobDag
.fromJson(currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
for (String child : jobDag.getDirectChildren(resourceName)) {
jobDag.getChildrenToParents().get(child).remove(resourceName);
}
for (String parent : jobDag.getDirectParents(resourceName)) {
jobDag.getParentsToChildren().get(parent).remove(resourceName);
}
jobDag.getChildrenToParents().remove(resourceName);
jobDag.getParentsToChildren().remove(resourceName);
jobDag.getAllNodes().remove(resourceName);
try {
currentData
.setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
} catch (Exception e) {
LOG.equals("Could not update DAG for job: " + resourceName);
}
return currentData;
}
};
accessor.getBaseDataAccessor().update(workflowKey.getPath(), dagRemover,
AccessOption.PERSISTENT);
// Delete resource configs.
PropertyKey cfgKey = getConfigPropertyKey(accessor, resourceName);
if (!accessor.removeProperty(cfgKey)) {
throw new RuntimeException(String.format(
"Error occurred while trying to clean up job %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
resourceName,
cfgKey));
}
// Delete property store information for this resource.
// For recurring workflow, it's OK if the node doesn't exist.
String propStoreKey = getRebalancerPropStoreKey(resourceName);
mgr.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT);
// Delete the ideal state itself.
PropertyKey isKey = getISPropertyKey(accessor, resourceName);
if (!accessor.removeProperty(isKey)) {
throw new RuntimeException(String.format(
"Error occurred while trying to clean up task %s. Failed to remove node %s from Helix.",
resourceName, isKey));
}
// Delete dead external view
// because job is already completed, there is no more current state change
// thus dead external views removal will not be triggered
PropertyKey evKey = accessor.keyBuilder().externalView(resourceName);
accessor.removeProperty(evKey);
LOG.info(String.format("Successfully cleaned up job resource %s.", resourceName));
boolean lastInWorkflow = true;
for (String job : cfg.getJobDag().getAllNodes()) {
// check if property store information or resource configs exist for this job
if (mgr.getHelixPropertyStore().exists(getRebalancerPropStoreKey(job),
AccessOption.PERSISTENT)
|| accessor.getProperty(getConfigPropertyKey(accessor, job)) != null
|| accessor.getProperty(getISPropertyKey(accessor, job)) != null) {
lastInWorkflow = false;
break;
}
}
// clean up workflow-level info if this was the last in workflow
if (lastInWorkflow && (cfg.isTerminable() || cfg.getTargetState() == TargetState.DELETE)) {
// delete workflow config
PropertyKey workflowCfgKey = getConfigPropertyKey(accessor, workflowResource);
if (!accessor.removeProperty(workflowCfgKey)) {
throw new RuntimeException(
String
.format(
"Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
workflowResource, workflowCfgKey));
}
// Delete property store information for this workflow
String workflowPropStoreKey = getRebalancerPropStoreKey(workflowResource);
if (!mgr.getHelixPropertyStore().remove(workflowPropStoreKey, AccessOption.PERSISTENT)) {
throw new RuntimeException(
String
.format(
"Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
workflowResource, workflowPropStoreKey));
}
// Remove pending timer for this workflow if exists
if (SCHEDULED_TIMES.containsKey(workflowResource)) {
SCHEDULED_TIMES.remove(workflowResource);
}
}
}
private static String getRebalancerPropStoreKey(String resource) {
return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource);
}
private static PropertyKey getISPropertyKey(HelixDataAccessor accessor, String resource) {
return accessor.keyBuilder().idealStates(resource);
}
private static PropertyKey getConfigPropertyKey(HelixDataAccessor accessor, String resource) {
return accessor.keyBuilder().resourceConfig(resource);
}
private static void addAllPartitions(Set<Integer> toAdd, Set<Integer> destination) {
for (Integer pId : toAdd) {
destination.add(pId);
}
}
private static ResourceAssignment emptyAssignment(String name, CurrentStateOutput currStateOutput) {
ResourceAssignment assignment = new ResourceAssignment(name);
Set<Partition> partitions = currStateOutput.getCurrentStateMappedPartitions(name);
for (Partition partition : partitions) {
Map<String, String> currentStateMap = currStateOutput.getCurrentStateMap(name, partition);
Map<String, String> replicaMap = Maps.newHashMap();
for (String instanceName : currentStateMap.keySet()) {
replicaMap.put(instanceName, HelixDefinedState.DROPPED.toString());
}
assignment.addReplicaMap(partition, replicaMap);
}
return assignment;
}
private static void addCompletedPartitions(Set<Integer> set, JobContext ctx,
Iterable<Integer> pIds) {
for (Integer pId : pIds) {
TaskPartitionState state = ctx.getPartitionState(pId);
if (state == TaskPartitionState.COMPLETED) {
set.add(pId);
}
}
}
private static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int pId) {
return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask();
}
// add all partitions that have been tried maxNumberAttempts
private static void addGiveupPartitions(Set<Integer> set, JobContext ctx, Iterable<Integer> pIds,
JobConfig cfg) {
for (Integer pId : pIds) {
if (isTaskGivenup(ctx, cfg, pId)) {
set.add(pId);
}
}
}
private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
Set<Integer> excluded, int n) {
List<Integer> result = new ArrayList<Integer>();
for (Integer pId : candidatePartitions) {
if (result.size() >= n) {
break;
}
if (!excluded.contains(pId)) {
result.add(pId);
}
}
return result;
}
private static void markPartitionDelayed(JobConfig cfg, JobContext ctx, int p) {
long delayInterval = cfg.getTaskRetryDelay();
if (delayInterval <= 0) {
return;
}
long nextStartTime = ctx.getPartitionFinishTime(p) + delayInterval;
ctx.setNextRetryTime(p, nextStartTime);
}
private static void markPartitionCompleted(JobContext ctx, int pId) {
ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
ctx.incrementNumAttempts(pId);
}
private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state,
boolean incrementAttempts) {
ctx.setPartitionState(pId, state);
ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
if (incrementAttempts) {
ctx.incrementNumAttempts(pId);
}
}
private static void markAllPartitionsError(JobContext ctx, TaskPartitionState state,
boolean incrementAttempts) {
for (int pId : ctx.getPartitionSet()) {
markPartitionError(ctx, pId, state, incrementAttempts);
}
}
/**
* Return the assignment of task partitions per instance.
*/
private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments(
Iterable<String> instanceList, ResourceAssignment assignment, Set<Integer> includeSet) {
Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
for (String instance : instanceList) {
result.put(instance, new TreeSet<Integer>());
}
for (Partition partition : assignment.getMappedPartitions()) {
int pId = pId(partition.getPartitionName());
if (includeSet.contains(pId)) {
Map<String, String> replicaMap = assignment.getReplicaMap(partition);
for (String instance : replicaMap.keySet()) {
SortedSet<Integer> pList = result.get(instance);
if (pList != null) {
pList.add(pId);
}
}
}
}
return result;
}
private static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) {
Set<Integer> nonReadyPartitions = Sets.newHashSet();
for (int p : ctx.getPartitionSet()) {
long toStart = ctx.getNextRetryTime(p);
if (now < toStart) {
nonReadyPartitions.add(p);
}
}
return nonReadyPartitions;
}
/**
* Computes the partition name given the resource name and partition id.
*/
protected static String pName(String resource, int pId) {
return resource + "_" + pId;
}
/**
* Extracts the partition id from the given partition name.
*/
protected static int pId(String pName) {
String[] tokens = pName.split("_");
return Integer.valueOf(tokens[tokens.length - 1]);
}
/**
* An (instance, state) pair.
*/
private static class PartitionAssignment {
private final String _instance;
private final String _state;
private PartitionAssignment(String instance, String state) {
_instance = instance;
_state = state;
}
}
@Override
public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
CurrentStateOutput currentStateOutput, WorkflowControllerDataProvider clusterData) {
// All of the heavy lifting is in the ResourceAssignment computation,
// so this part can just be a no-op.
return currentIdealState;
}
/**
* The simplest possible runnable that will trigger a run of the controller pipeline
*/
private static class RebalanceInvoker implements Runnable {
private final HelixManager _manager;
private final String _resource;
public RebalanceInvoker(HelixManager manager, String resource) {
_manager = manager;
_resource = resource;
}
@Override
public void run() {
RebalanceScheduler.invokeRebalance(_manager.getHelixDataAccessor(), _resource);
}
}
}