blob: fb756e861a262f0fb8edc5bec80cceb42acd7fdf [file] [log] [blame]
package org.apache.helix.task;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.I0Itec.zkclient.DataUpdater;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.builder.CustomModeISBuilder;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.util.HelixUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* CLI for scheduling/canceling workflows
*/
public class TaskDriver {
public enum DriverCommand {
start,
stop,
delete,
resume,
list,
flush,
clean
}
/** For logging */
private static final Logger LOG = LoggerFactory.getLogger(TaskDriver.class);
/** Default time out for monitoring workflow or job state */
private final static int _defaultTimeout = 3 * 60 * 1000; /* 3 mins */
// HELIX-619 This is a temporary solution for too many ZK nodes issue.
// Limit workflows/jobs creation to prevent the problem.
//
// Note this limitation should be smaller than ZK capacity. If current nodes count already exceeds
// the CAP, the verification method will not throw exception since the getChildNames() call will
// return empty list.
//
// TODO Implement or configure the limitation in ZK server.
private final static long DEFAULT_CONFIGS_LIMITATION =
HelixUtil.getSystemPropertyAsLong(SystemPropertyKeys.TASK_CONFIG_LIMITATION, 100000L);
private final static String TASK_START_TIME_KEY = "START_TIME";
protected long _configsLimitation = DEFAULT_CONFIGS_LIMITATION;
private final HelixDataAccessor _accessor;
private final HelixPropertyStore<ZNRecord> _propertyStore;
private final HelixAdmin _admin;
private final String _clusterName;
public TaskDriver(HelixManager manager) {
this(manager.getClusterManagmentTool(), manager.getHelixDataAccessor(),
manager.getHelixPropertyStore(), manager.getClusterName());
}
public TaskDriver(HelixZkClient client, String clusterName) {
this(client, new ZkBaseDataAccessor<>(client), clusterName);
}
public TaskDriver(HelixZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor,
String clusterName) {
this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor),
new ZkHelixPropertyStore<>(baseAccessor, PropertyPathBuilder.propertyStore(clusterName),
null),
clusterName);
}
@Deprecated
public TaskDriver(HelixAdmin admin, HelixDataAccessor accessor, ConfigAccessor cfgAccessor,
HelixPropertyStore<ZNRecord> propertyStore, String clusterName) {
this(admin, accessor, propertyStore, clusterName);
}
public TaskDriver(HelixAdmin admin, HelixDataAccessor accessor,
HelixPropertyStore<ZNRecord> propertyStore, String clusterName) {
_admin = admin;
_accessor = accessor;
_propertyStore = propertyStore;
_clusterName = clusterName;
}
/**
* Schedules a new workflow
* @param flow
*/
public void start(Workflow flow) {
LOG.info("Starting workflow " + flow.getName());
flow.validate();
validateZKNodeLimitation(flow.getJobConfigs().keySet().size() + 1);
WorkflowConfig newWorkflowConfig =
new WorkflowConfig.Builder(flow.getWorkflowConfig()).setWorkflowId(flow.getName()).build();
Map<String, String> jobTypes = new HashMap<>();
// add all job configs.
for (String job : flow.getJobConfigs().keySet()) {
JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(flow.getJobConfigs().get(job));
if (flow.getTaskConfigs() != null && flow.getTaskConfigs().containsKey(job)) {
jobCfgBuilder.addTaskConfigs(flow.getTaskConfigs().get(job));
}
JobConfig jobCfg = jobCfgBuilder.build();
if (jobCfg.getJobType() != null) {
jobTypes.put(job, jobCfg.getJobType());
}
addJobConfig(job, jobCfg);
}
newWorkflowConfig.setJobTypes(jobTypes);
// add workflow config.
if (!TaskUtil.createWorkflowConfig(_accessor, flow.getName(), newWorkflowConfig)) {
// workflow config creation failed; try to delete job configs back
Set<String> failedJobRemoval = new HashSet<>();
for (String job : flow.getJobConfigs().keySet()) {
if (!TaskUtil.removeJobConfig(_accessor, job)) {
failedJobRemoval.add(job);
}
}
throw new HelixException(String.format(
"Failed to add workflow configuration for workflow %s. It's possible that a workflow of the same name already exists or there was a connection issue. JobConfig deletion attempted but failed for the following jobs: %s",
flow.getName(), failedJobRemoval));
}
// Finally add workflow resource.
addWorkflowResource(flow.getName());
}
/**
* Update the configuration of a non-terminable workflow (queue).
* The terminable workflow's configuration is not allowed
* to change once created.
* Note:
* For recurrent workflow, the current running schedule will not be effected,
* the new configuration will be applied to the next scheduled runs of the workflow.
* For non-recurrent workflow, the new configuration may (or may not) be applied
* on the current running jobs, but it will be applied on the following unscheduled jobs.
* Example:
* _driver = new TaskDriver ...
* WorkflowConfig currentWorkflowConfig = _driver.getWorkflowCfg(_manager, workflow);
* WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(currentWorkflowConfig);
* // make needed changes to the config here
* configBuilder.setXXX();
* // update workflow configuration
* _driver.updateWorkflow(workflow, configBuilder.build());
* @param workflow
* @param newWorkflowConfig
*/
public void updateWorkflow(String workflow, WorkflowConfig newWorkflowConfig) {
if (newWorkflowConfig.getWorkflowId() == null || newWorkflowConfig.getWorkflowId().isEmpty()) {
newWorkflowConfig.getRecord()
.setSimpleField(WorkflowConfig.WorkflowConfigProperty.WorkflowID.name(), workflow);
}
if (workflow == null || !workflow.equals(newWorkflowConfig.getWorkflowId())) {
throw new HelixException(String.format(
"Workflow name {%s} does not match the workflow Id from WorkflowConfig {%s}", workflow,
newWorkflowConfig.getWorkflowId()));
}
WorkflowConfig currentConfig = TaskUtil.getWorkflowConfig(_accessor, workflow);
if (currentConfig == null) {
throw new HelixException("Workflow " + workflow + " does not exist!");
}
if (currentConfig.isTerminable()) {
throw new HelixException(
"Workflow " + workflow + " is terminable, not allow to change its configuration!");
}
// Should not let user changing DAG in the workflow
newWorkflowConfig.setJobDag(currentConfig.getJobDag());
if (!TaskUtil.setWorkflowConfig(_accessor, workflow, newWorkflowConfig)) {
LOG.error("Failed to update workflow configuration for workflow " + workflow);
}
RebalanceScheduler.invokeRebalance(_accessor, workflow);
}
/**
* Creates a new named job queue (workflow)
* @param queue
*/
public void createQueue(JobQueue queue) {
start(queue);
}
/**
* Remove all completed or failed jobs in a job queue
* Same as {@link #cleanupQueue(String)}
* @param queue name of the queue
* @throws Exception
*/
public void flushQueue(String queue) {
cleanupQueue(queue);
}
/**
* Delete a job from an existing named queue,
* the queue has to be stopped prior to this call
* @param queue queue name
* @param job job name, denamespaced
*/
public void deleteJob(final String queue, final String job) {
deleteNamespacedJob(queue, TaskUtil.getNamespacedJobName(queue, job), false);
}
/**
* Delete a job from an existing named queue,
* the queue has to be stopped prior to this call
* @param queue queue name
* @param job job name, denamespaced
* @param forceDelete CAUTION: if set true, all job's related zk nodes will
* be clean up from zookeeper even if its workflow information can not be found.
*/
public void deleteJob(final String queue, final String job, boolean forceDelete) {
deleteNamespacedJob(queue, TaskUtil.getNamespacedJobName(queue, job), forceDelete);
}
/**
* Delete a job from an existing named queue,
* the queue has to be stopped prior to this call
* @param queue queue name
* @param job job name: namespaced job name
* @param forceDelete CAUTION: if set true, all job's related zk nodes will
* be clean up from zookeeper even if its workflow information can not be found.
*/
public void deleteNamespacedJob(final String queue, final String job, boolean forceDelete) {
WorkflowConfig workflowCfg = TaskUtil.getWorkflowConfig(_accessor, queue);
if (workflowCfg == null) {
if (forceDelete) {
// remove all job znodes if its original workflow config was already gone.
LOG.info("Forcefully removing job: " + job + " from queue: " + queue);
boolean success = TaskUtil.removeJob(_accessor, _propertyStore, job);
if (!success) {
LOG.info("Failed to delete job: " + job + " from queue: " + queue);
throw new HelixException("Failed to delete job: " + job + " from queue: " + queue);
}
} else {
throw new IllegalArgumentException("Queue " + queue + " does not yet exist!");
}
return;
}
if (!workflowCfg.isJobQueue()) {
throw new IllegalArgumentException(queue + " is not a queue!");
}
boolean isRecurringWorkflow =
(workflowCfg.getScheduleConfig() != null && workflowCfg.getScheduleConfig().isRecurring());
String denamespacedJob = TaskUtil.getDenamespacedJobName(queue, job);
if (isRecurringWorkflow) {
// delete job from the last scheduled queue if there exists one.
WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queue);
String lastScheduledQueue = null;
if (wCtx != null) {
lastScheduledQueue = wCtx.getLastScheduledSingleWorkflow();
}
if (lastScheduledQueue != null) {
WorkflowConfig lastWorkflowCfg = TaskUtil.getWorkflowConfig(_accessor, lastScheduledQueue);
if (lastWorkflowCfg != null) {
deleteJobFromQueue(lastScheduledQueue, denamespacedJob);
}
}
}
deleteJobFromQueue(queue, denamespacedJob);
}
/**
* Delete a job from a scheduled (non-recurrent) queue.
* @param queue
* @param job this must be a namespaced job name
*/
private void deleteJobFromQueue(final String queue, final String job) {
WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_propertyStore, queue);
String workflowState = (workflowCtx != null) ? workflowCtx.getWorkflowState().name()
: TaskState.NOT_STARTED.name();
if (workflowState.equals(TaskState.IN_PROGRESS.name())) {
throw new IllegalStateException("Queue " + queue + " is still running!");
}
if (workflowState.equals(TaskState.COMPLETED.name())
|| workflowState.equals(TaskState.FAILED.name())
|| workflowState.equals(TaskState.ABORTED.name())) {
LOG.warn(
"Queue " + queue + " has already reached its final state, skip deleting job from it.");
return;
}
if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue,
Collections.singleton(TaskUtil.getNamespacedJobName(queue, job)), true)) {
LOG.error("Failed to delete job " + job + " from queue " + queue);
throw new HelixException("Failed to delete job " + job + " from queue " + queue);
}
}
/**
* Adds a new job to the end an existing named queue.
* @param queue
* @param job
* @param jobBuilder
* @throws Exception
*/
public void enqueueJob(final String queue, final String job, JobConfig.Builder jobBuilder) {
enqueueJobs(queue, Collections.singletonList(job), Collections.singletonList(jobBuilder));
}
/**
* Batch add jobs to queues that garantee
* @param queue
* @param jobs
* @param jobBuilders
*/
public void enqueueJobs(final String queue, final List<String> jobs,
final List<JobConfig.Builder> jobBuilders) {
// Get the job queue config and capacity
WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
if (workflowConfig == null) {
throw new IllegalArgumentException("Queue " + queue + " config does not yet exist!");
}
if (workflowConfig.isTerminable()) {
throw new IllegalArgumentException(queue + " is not a queue!");
}
final int capacity = workflowConfig.getCapacity();
int queueSize = workflowConfig.getJobDag().size();
if (capacity > 0 && queueSize >= capacity) {
// if queue is full, Helix will try to clean up the expired job to free more space.
WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_propertyStore, queue);
if (workflowContext != null) {
Set<String> expiredJobs =
TaskUtil.getExpiredJobs(_accessor, _propertyStore, workflowConfig, workflowContext);
if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, expiredJobs, true)) {
LOG.warn("Failed to clean up expired and completed jobs from queue " + queue);
}
}
workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
if (workflowConfig.getJobDag().size() >= capacity) {
throw new HelixException("Failed to enqueue a job, queue is full.");
}
}
validateZKNodeLimitation(1);
final List<JobConfig> jobConfigs = new ArrayList<>();
final List<String> namespacedJobNames = new ArrayList<>();
final List<String> jobTypeList = new ArrayList<>();
try {
for (int i = 0; i < jobBuilders.size(); i++) {
// Create the job to ensure that it validates
JobConfig jobConfig = jobBuilders.get(i).setWorkflow(queue).build();
String namespacedJobName = TaskUtil.getNamespacedJobName(queue, jobs.get(i));
// add job config first.
addJobConfig(namespacedJobName, jobConfig);
jobConfigs.add(jobConfig);
namespacedJobNames.add(namespacedJobName);
jobTypeList.add(jobConfig.getJobType());
}
} catch (HelixException e) {
LOG.error("Failed to add job configs {}. Remove them all!", jobs.toString());
for (String job : jobs) {
String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
TaskUtil.removeJobConfig(_accessor, namespacedJobName);
}
}
// update the job dag to append the job to the end of the queue.
DataUpdater<ZNRecord> updater = currentData -> {
if (currentData == null) {
// For some reason, the WorkflowConfig for this JobQueue doesn't exist
// In this case, we cannot proceed and must alert the user
throw new HelixException(
String.format("enqueueJobs DataUpdater: JobQueue %s config is not found!", queue));
}
// Add the node to the existing DAG
JobDag jobDag = JobDag
.fromJson(currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
Set<String> allNodes = jobDag.getAllNodes();
if (capacity > 0 && allNodes.size() + jobConfigs.size() >= capacity) {
throw new IllegalStateException(
String.format("Queue %s already reaches its max capacity %d, failed to add %s", queue,
capacity, jobs.toString()));
}
String lastNodeName = null;
for (int i = 0; i < namespacedJobNames.size(); i++) {
String namespacedJobName = namespacedJobNames.get(i);
if (allNodes.contains(namespacedJobName)) {
throw new IllegalStateException(String
.format("Could not add to queue %s, job %s already exists", queue, jobs.get(i)));
}
jobDag.addNode(namespacedJobName);
// Add the node to the end of the queue
String candidate = null;
if (lastNodeName == null) {
for (String node : allNodes) {
if (!node.equals(namespacedJobName) && jobDag.getDirectChildren(node).isEmpty()) {
candidate = node;
break;
}
}
} else {
candidate = lastNodeName;
}
if (candidate != null) {
jobDag.addParentToChild(candidate, namespacedJobName);
lastNodeName = namespacedJobName;
}
}
// Add job type if job type is not null
Map<String, String> jobTypes =
currentData.getMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name());
for (String jobType : jobTypeList) {
if (jobType != null) {
if (jobTypes == null) {
jobTypes = new HashMap<>();
}
jobTypes.put(queue, jobType);
}
}
if (jobTypes != null) {
currentData.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), jobTypes);
}
// Save the updated DAG
try {
currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(),
jobDag.toJson());
} catch (Exception e) {
throw new IllegalStateException(
String.format("Could not add jobs %s to queue %s", jobs.toString(), queue), e);
}
return currentData;
};
String path = _accessor.keyBuilder().resourceConfig(queue).getPath();
boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
if (!status) {
LOG.error("Failed to update WorkflowConfig, remove all jobs {}", jobs.toString());
for (String job : jobs) {
TaskUtil.removeJobConfig(_accessor, job);
}
throw new HelixException("Failed to enqueue job");
}
// This is to make it back-compatible with old Helix task driver.
addWorkflowResourceIfNecessary(queue);
// Schedule the job
RebalanceScheduler.invokeRebalance(_accessor, queue);
}
/**
* Keep the old name of API for backward compatibility
* @param queue
*/
@Deprecated
public void cleanupJobQueue(String queue) {
cleanupQueue(queue);
}
/**
* Remove all jobs that are in final states (ABORTED, FAILED, COMPLETED) from the job queue. The
* job config, job context will be removed from Zookeeper.
* @param queue The name of job queue
*/
public void cleanupQueue(String queue) {
WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
if (workflowConfig == null) {
throw new IllegalArgumentException("Queue " + queue + " does not yet exist!");
}
if (!workflowConfig.isJobQueue() || workflowConfig.isTerminable()) {
throw new IllegalArgumentException(queue + " is not a queue!");
}
WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queue);
if (wCtx == null || wCtx.getWorkflowState() == null) {
throw new IllegalStateException("Queue " + queue + " does not have a valid work state!");
}
Set<String> jobs = new HashSet<>();
for (String jobNode : workflowConfig.getJobDag().getAllNodes()) {
TaskState curState = wCtx.getJobState(jobNode);
if (curState != null && curState == TaskState.ABORTED || curState == TaskState.COMPLETED
|| curState == TaskState.FAILED) {
jobs.add(jobNode);
}
}
TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, jobs, true);
}
/** Posts new workflow resource to cluster */
private void addWorkflowResource(String workflow) {
// Add workflow resource
_admin.addResource(_clusterName, workflow, 1, TaskConstants.STATE_MODEL_NAME);
IdealState is = buildWorkflowIdealState(workflow);
TaskUtil.createUserContent(_propertyStore, workflow, new ZNRecord(TaskUtil.USER_CONTENT_NODE));
_admin.setResourceIdealState(_clusterName, workflow, is);
}
/**
* Posts new workflow resource to cluster if it does not exist
*/
private void addWorkflowResourceIfNecessary(String workflow) {
IdealState is = _admin.getResourceIdealState(_clusterName, workflow);
if (is == null) {
addWorkflowResource(workflow);
}
}
private IdealState buildWorkflowIdealState(String workflow) {
CustomModeISBuilder IsBuilder = new CustomModeISBuilder(workflow);
IsBuilder.setRebalancerMode(IdealState.RebalanceMode.TASK).setNumReplica(1).setNumPartitions(1)
.setStateModel(TaskConstants.STATE_MODEL_NAME).disableExternalView();
IdealState is = IsBuilder.build();
is.getRecord().setListField(workflow, new ArrayList<>());
is.getRecord().setMapField(workflow, new HashMap<>());
is.setRebalancerClassName(WorkflowRebalancer.class.getName());
return is;
}
/**
* Add new job config to cluster by way of create
*/
private void addJobConfig(String job, JobConfig jobConfig) {
LOG.info("Add job configuration " + job);
// Set the job configuration
JobConfig newJobCfg = new JobConfig(job, jobConfig);
if (!TaskUtil.createJobConfig(_accessor, job, newJobCfg)) {
throw new HelixException("Failed to add job configuration for job " + job
+ ". It's possible that a job of the same name already exists or there was a connection issue");
}
}
/**
* Public method to resume a workflow/queue.
* @param workflow
*/
public void resume(String workflow) {
setWorkflowTargetState(workflow, TargetState.START);
}
/**
* Public async method to stop a workflow/queue.
* This call only send STOP command to Helix, it does not check
* whether the workflow (all jobs) has been stopped yet.
* @param workflow
*/
public void stop(String workflow) {
setWorkflowTargetState(workflow, TargetState.STOP);
}
/**
* Public sync method to stop a workflow/queue with timeout
* Basically the workflow and all of its jobs has been stopped if this method return success.
* @param workflow The workflow name
* @param timeout The timeout for stopping workflow/queue in milisecond
*/
public void waitToStop(String workflow, long timeout) throws InterruptedException {
setWorkflowTargetState(workflow, TargetState.STOP);
long endTime = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() <= endTime) {
WorkflowContext workflowContext = getWorkflowContext(workflow);
if (workflowContext == null
|| TaskState.IN_PROGRESS.equals(workflowContext.getWorkflowState())) {
Thread.sleep(1000);
} else {
// Successfully stopped
return;
}
}
// Failed to stop with timeout
throw new HelixException(String
.format("Fail to stop the workflow/queue %s with in %d milliseconds.", workflow, timeout));
}
/**
* Public method to delete a workflow/queue.
* @param workflow
*/
public void delete(String workflow) {
delete(workflow, false);
}
/**
* Public method to delete a workflow/queue.
* @param workflow
* @param forceDelete, CAUTION: if set true, workflow and all of its jobs' related zk nodes will
* be clean up immediately from zookeeper, no matter whether there are jobs
* are running or not.
*/
public void delete(String workflow, boolean forceDelete) {
WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflow);
if (forceDelete) {
// if forceDelete, remove the workflow and its jobs immediately from zookeeper.
LOG.info("Forcefully removing workflow: " + workflow);
removeWorkflowFromZK(workflow);
} else {
// Set the workflow target state to DELETE, and let Helix controller to remove the workflow.
// Controller may remove the workflow instantly, so record context before set DELETE state.
setWorkflowTargetState(workflow, TargetState.DELETE);
}
// Delete all previously scheduled workflows.
if (wCtx != null && wCtx.getScheduledWorkflows() != null) {
for (String scheduledWorkflow : wCtx.getScheduledWorkflows()) {
if (forceDelete) {
// Only directly delete it if it's force delete. Otherwise, it will cause a race condition
// where contexts would get written back to ZK from cache
WorkflowContext scheduledWorkflowCtx =
TaskUtil.getWorkflowContext(_propertyStore, scheduledWorkflow);
if (scheduledWorkflowCtx != null
&& scheduledWorkflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) {
removeWorkflowFromZK(scheduledWorkflow);
}
} else {
setWorkflowTargetState(scheduledWorkflow, TargetState.DELETE);
}
}
}
}
private void removeWorkflowFromZK(String workflow) {
Set<String> jobSet = new HashSet<>();
// Note that even WorkflowConfig is null, if WorkflowContext exists, still need to remove
// workflow
WorkflowConfig wCfg = TaskUtil.getWorkflowConfig(_accessor, workflow);
if (wCfg != null) {
jobSet.addAll(wCfg.getJobDag().getAllNodes());
}
boolean success = TaskUtil.removeWorkflow(_accessor, _propertyStore, workflow, jobSet);
if (!success) {
LOG.info("Failed to delete the workflow " + workflow);
throw new HelixException("Failed to delete the workflow " + workflow);
}
}
/**
* Public synchronized method to wait for a delete operation to fully complete with timeout.
* When this method returns, it means that a queue (workflow) has been completely deleted, meaning
* its IdealState, WorkflowConfig, and WorkflowContext have all been deleted.
* @param workflow workflow/jobqueue name
* @param timeout duration to give to delete operation to completion
*/
public void deleteAndWaitForCompletion(String workflow, long timeout)
throws InterruptedException {
delete(workflow);
long endTime = System.currentTimeMillis() + timeout;
// For checking whether delete completed
BaseDataAccessor baseDataAccessor = _accessor.getBaseDataAccessor();
PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
String idealStatePath = keyBuilder.idealStates(workflow).getPath();
String workflowConfigPath = keyBuilder.resourceConfig(workflow).getPath();
String workflowContextPath = keyBuilder.workflowContext(workflow).getPath();
while (System.currentTimeMillis() <= endTime) {
if (baseDataAccessor.exists(idealStatePath, AccessOption.PERSISTENT)
|| baseDataAccessor.exists(workflowConfigPath, AccessOption.PERSISTENT)
|| baseDataAccessor.exists(workflowContextPath, AccessOption.PERSISTENT)) {
Thread.sleep(1000);
} else {
return;
}
}
// Deletion failed: check which step of deletion failed to complete and create an error message
StringBuilder failed = new StringBuilder();
if (baseDataAccessor.exists(idealStatePath, AccessOption.PERSISTENT)) {
failed.append("IdealState ");
}
if (baseDataAccessor.exists(workflowConfigPath, AccessOption.PERSISTENT)) {
failed.append("WorkflowConfig ");
}
if (baseDataAccessor.exists(workflowContextPath, AccessOption.PERSISTENT)) {
failed.append("WorkflowContext ");
}
throw new HelixException(
String.format(
"Failed to delete the workflow/queue %s within %d milliseconds. "
+ "The following components still remain: %s",
workflow, timeout, failed.toString()));
}
/**
* Helper function to change target state for a given workflow
*/
private void setWorkflowTargetState(String workflow, TargetState state) {
setSingleWorkflowTargetState(workflow, state);
// For recurring schedules, last scheduled incomplete workflow must also be handled
WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflow);
if (wCtx != null) {
String lastScheduledWorkflow = wCtx.getLastScheduledSingleWorkflow();
if (lastScheduledWorkflow != null) {
setSingleWorkflowTargetState(lastScheduledWorkflow, state);
}
}
}
/**
* Helper function to change target state for a given workflow
*/
private void setSingleWorkflowTargetState(String workflow, final TargetState state) {
LOG.info("Set " + workflow + " to target state " + state);
WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflow);
if (workflowConfig == null) {
LOG.warn("WorkflowConfig for " + workflow + " not found!");
return;
}
WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_propertyStore, workflow);
if (state != TargetState.DELETE && workflowContext != null
&& workflowContext.getFinishTime() != WorkflowContext.UNFINISHED) {
// Should not update target state for completed workflow
LOG.info("Workflow " + workflow + " is already completed, skip to update its target state "
+ state);
return;
}
DataUpdater<ZNRecord> updater = currentData -> {
if (currentData != null) {
currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(),
state.name());
} else {
LOG.warn("TargetState DataUpdater: Fails to update target state. CurrentData is null.");
}
return currentData;
};
PropertyKey workflowConfigKey = TaskUtil.getWorkflowConfigKey(_accessor, workflow);
_accessor.getBaseDataAccessor().update(workflowConfigKey.getPath(), updater,
AccessOption.PERSISTENT);
RebalanceScheduler.invokeRebalance(_accessor, workflow);
}
public WorkflowConfig getWorkflowConfig(String workflow) {
return TaskUtil.getWorkflowConfig(_accessor, workflow);
}
public WorkflowContext getWorkflowContext(String workflow) {
return TaskUtil.getWorkflowContext(_propertyStore, workflow);
}
public JobConfig getJobConfig(String job) {
return TaskUtil.getJobConfig(_accessor, job);
}
public JobContext getJobContext(String job) {
return TaskUtil.getJobContext(_propertyStore, job);
}
public static JobContext getJobContext(HelixManager manager, String job) {
return TaskUtil.getJobContext(manager, job);
}
public static WorkflowConfig getWorkflowConfig(HelixManager manager, String workflow) {
return TaskUtil.getWorkflowConfig(manager, workflow);
}
public static WorkflowContext getWorkflowContext(HelixManager manager, String workflow) {
return TaskUtil.getWorkflowContext(manager, workflow);
}
public static JobConfig getJobConfig(HelixManager manager, String job) {
return TaskUtil.getJobConfig(manager, job);
}
/**
* Batch get the configurations of all workflows in this cluster.
* @return
*/
public Map<String, WorkflowConfig> getWorkflows() {
Map<String, WorkflowConfig> workflowConfigMap = new HashMap<>();
Map<String, ResourceConfig> resourceConfigMap =
_accessor.getChildValuesMap(_accessor.keyBuilder().resourceConfigs());
for (Map.Entry<String, ResourceConfig> resource : resourceConfigMap.entrySet()) {
try {
WorkflowConfig config = WorkflowConfig.fromHelixProperty(resource.getValue());
workflowConfigMap.put(resource.getKey(), config);
} catch (IllegalArgumentException ex) {
// ignore if it is not a workflow config.
}
}
return workflowConfigMap;
}
/**
* This call will be blocked until either workflow reaches to one of the state specified
* in the arguments, or timeout happens. If timeout happens, then it will throw a HelixException
* Otherwise, it will return current workflow state
* @param workflowName The workflow to be monitored
* @param timeout A long integer presents the time out, in milliseconds
* @param targetStates Specified states that user would like to stop monitoring
* @return A TaskState, which is current workflow state
* @throws InterruptedException
*/
public TaskState pollForWorkflowState(String workflowName, long timeout,
TaskState... targetStates) throws InterruptedException {
// Wait for completion.
long st = System.currentTimeMillis();
WorkflowContext ctx;
Set<TaskState> allowedStates = new HashSet<>(Arrays.asList(targetStates));
long timeToSleep = timeout > 100L ? 100L : timeout;
do {
Thread.sleep(timeToSleep);
ctx = getWorkflowContext(workflowName);
} while ((ctx == null || ctx.getWorkflowState() == null
|| !allowedStates.contains(ctx.getWorkflowState()))
&& System.currentTimeMillis() < st + timeout);
if (ctx == null || !allowedStates.contains(ctx.getWorkflowState())) {
throw new HelixException(String.format(
"Workflow \"%s\" context is empty or not in states: \"%s\", current state: \"%s\"",
workflowName, Arrays.asList(targetStates),
ctx == null ? "null" : ctx.getWorkflowState().toString()));
}
return ctx.getWorkflowState();
}
/**
* This is a wrapper function that set default time out for monitoring workflow in 2 MINUTES.
* If timeout happens, then it will throw a HelixException, Otherwise, it will return
* current job state.
* @param workflowName The workflow to be monitored
* @param targetStates Specified states that user would like to stop monitoring
* @return A TaskState, which is current workflow state
* @throws InterruptedException
*/
public TaskState pollForWorkflowState(String workflowName, TaskState... targetStates)
throws InterruptedException {
return pollForWorkflowState(workflowName, _defaultTimeout, targetStates);
}
/**
* This call will be blocked until either specified job reaches to one of the state
* in the arguments, or timeout happens. If timeout happens, then it will throw a HelixException
* Otherwise, it will return current job state
* @param workflowName The workflow that contains the job to monitor
* @param jobName The specified job to monitor
* @param timeout A long integer presents the time out, in milliseconds
* @param states Specified states that user would like to stop monitoring
* @return A TaskState, which is current job state
* @throws Exception
*/
public TaskState pollForJobState(String workflowName, String jobName, long timeout,
TaskState... states) throws InterruptedException {
// Get workflow config
WorkflowConfig workflowConfig = getWorkflowConfig(workflowName);
if (workflowConfig == null) {
throw new HelixException(String.format("Workflow \"%s\" does not exists!", workflowName));
}
long timeToSleep = timeout > 50L ? 50L : timeout;
WorkflowContext ctx;
if (workflowConfig.isRecurring()) {
// if it's recurring, need to reconstruct workflow and job name
do {
Thread.sleep(timeToSleep);
ctx = getWorkflowContext(workflowName);
} while ((ctx == null || ctx.getLastScheduledSingleWorkflow() == null));
jobName = jobName.substring(workflowName.length() + 1);
workflowName = ctx.getLastScheduledSingleWorkflow();
}
Set<TaskState> allowedStates = new HashSet<>(Arrays.asList(states));
// Wait for state
long st = System.currentTimeMillis();
do {
Thread.sleep(timeToSleep);
ctx = getWorkflowContext(workflowName);
} while ((ctx == null || ctx.getJobState(jobName) == null
|| !allowedStates.contains(ctx.getJobState(jobName)))
&& System.currentTimeMillis() < st + timeout);
if (ctx == null || !allowedStates.contains(ctx.getJobState(jobName))) {
throw new HelixException(
String.format("Workflow \"%s\" context is null or job \"%s\" is not in states: %s",
workflowName, jobName, allowedStates));
}
return ctx.getJobState(jobName);
}
/**
* This is a wrapper function for monitoring job state with default timeout 2 MINUTES.
* If timeout happens, then it will throw a HelixException, Otherwise, it will return
* current job state
* @param workflowName The workflow that contains the job to monitor
* @param jobName The specified job to monitor
* @param states Specified states that user would like to stop monitoring
* @return A TaskState, which is current job state
* @throws Exception
*/
public TaskState pollForJobState(String workflowName, String jobName, TaskState... states)
throws InterruptedException {
return pollForJobState(workflowName, jobName, _defaultTimeout, states);
}
/**
* This function returns the timestamp of the very last task that was scheduled. It is provided to
* help determine
* whether a given Workflow/Job/Task is stuck.
* @param workflowName The name of the workflow
* @return timestamp of the most recent job scheduled.
* -1L if timestamp is not set (either nothing is scheduled or no start time recorded).
*/
public long getLastScheduledTaskTimestamp(String workflowName) {
return getLastScheduledTaskExecutionInfo(workflowName).getStartTimeStamp();
}
public TaskExecutionInfo getLastScheduledTaskExecutionInfo(String workflowName) {
long lastScheduledTaskTimestamp = TaskExecutionInfo.TIMESTAMP_NOT_SET;
String jobName = null;
Integer taskPartitionIndex = null;
TaskPartitionState state = null;
WorkflowContext workflowContext = getWorkflowContext(workflowName);
if (workflowContext != null) {
Map<String, TaskState> allJobStates = workflowContext.getJobStates();
for (Map.Entry<String, TaskState> jobState : allJobStates.entrySet()) {
if (!jobState.getValue().equals(TaskState.NOT_STARTED)) {
JobContext jobContext = getJobContext(jobState.getKey());
if (jobContext != null) {
Set<Integer> allPartitions = jobContext.getPartitionSet();
for (Integer partition : allPartitions) {
String startTime = jobContext.getMapField(partition).get(TASK_START_TIME_KEY);
if (startTime != null) {
long startTimeLong = Long.parseLong(startTime);
if (startTimeLong > lastScheduledTaskTimestamp) {
lastScheduledTaskTimestamp = startTimeLong;
jobName = jobState.getKey();
taskPartitionIndex = partition;
state = jobContext.getPartitionState(partition);
}
}
}
}
}
}
}
return new TaskExecutionInfo(jobName, taskPartitionIndex, state, lastScheduledTaskTimestamp);
}
/**
* Returns the lookup of UserContentStore by key.
* @param key key used at write time by a task implementing UserContentStore
* @param scope scope used at write time
* @param workflowName name of workflow. Must be supplied
* @param jobName name of job. Optional if scope is WORKFLOW
* @param taskName name of task. Optional if scope is WORKFLOW or JOB
* @return null if key-value pair not found or this content store does not exist. Otherwise,
* return a String
* @deprecated use the following equivalents: {@link #getWorkflowUserContentMap(String)},
* {@link #getJobUserContentMap(String, String)},
* @{{@link #getTaskContentMap(String, String, String)}}
*/
@Deprecated
public String getUserContent(String key, UserContentStore.Scope scope, String workflowName,
String jobName, String taskName) {
return TaskUtil.getUserContent(_propertyStore, key, scope, workflowName, jobName, taskName);
}
/**
* Return the full user content map for workflow
* @param workflowName workflow name
* @return user content map
*/
public Map<String, String> getWorkflowUserContentMap(String workflowName) {
return TaskUtil.getWorkflowJobUserContentMap(_propertyStore, workflowName);
}
/**
* Return full user content map for job
* @param workflowName workflow name
* @param jobName Un-namespaced job name
* @return user content map
*/
public Map<String, String> getJobUserContentMap(String workflowName, String jobName) {
String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
return TaskUtil.getWorkflowJobUserContentMap(_propertyStore, namespacedJobName);
}
/**
* Return full user content map for task
* @param workflowName workflow name
* @param jobName Un-namespaced job name
* @param taskPartitionId task partition id
* @return user content map
*/
public Map<String, String> getTaskUserContentMap(String workflowName, String jobName,
String taskPartitionId) {
String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
String namespacedTaskName = TaskUtil.getNamespacedTaskName(namespacedJobName, taskPartitionId);
return TaskUtil.getTaskUserContentMap(_propertyStore, namespacedJobName, namespacedTaskName);
}
/**
* Add or update workflow user content with the given map - new keys will be added, and old
* keys will be updated
* @param workflowName workflow name
* @param contentToAddOrUpdate map containing items to add or update
*/
public void addOrUpdateWorkflowUserContentMap(String workflowName,
final Map<String, String> contentToAddOrUpdate) {
TaskUtil.addOrUpdateWorkflowJobUserContentMap(_propertyStore, workflowName,
contentToAddOrUpdate);
}
/**
* Add or update job user content with the given map - new keys will be added, and old keys will
* be updated
* @param workflowName workflow name
* @param jobName Un-namespaced job name
* @param contentToAddOrUpdate map containing items to add or update
*/
public void addOrUpdateJobUserContentMap(String workflowName, String jobName,
final Map<String, String> contentToAddOrUpdate) {
String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
TaskUtil.addOrUpdateWorkflowJobUserContentMap(_propertyStore, namespacedJobName,
contentToAddOrUpdate);
}
/**
* Add or update task user content with the given map - new keys will be added, and old keys
* will be updated
* @param workflowName workflow name
* @param jobName Un-namespaced job name
* @param taskPartitionId task partition id
* @param contentToAddOrUpdate map containing items to add or update
*/
public void addOrUpdateTaskUserContentMap(String workflowName, String jobName,
String taskPartitionId, final Map<String, String> contentToAddOrUpdate) {
String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName, jobName);
String namespacedTaskName = TaskUtil.getNamespacedTaskName(namespacedJobName, taskPartitionId);
TaskUtil.addOrUpdateTaskUserContentMap(_propertyStore, namespacedJobName, namespacedTaskName,
contentToAddOrUpdate);
}
/**
* Throw Exception if children nodes will exceed limitation after adding newNodesCount children.
* @param newConfigNodeCount
*/
private void validateZKNodeLimitation(int newConfigNodeCount) {
List<String> resourceConfigs =
_accessor.getChildNames(_accessor.keyBuilder().resourceConfigs());
if (resourceConfigs.size() + newConfigNodeCount > _configsLimitation) {
throw new HelixException(
"Cannot create more workflows or jobs because there are already too many items created in the path CONFIGS.");
}
}
}