| package org.apache.helix.task; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.TimeoutException; |
| |
| import org.apache.helix.AccessOption; |
| import org.apache.helix.BaseDataAccessor; |
| import org.apache.helix.ConfigAccessor; |
| import org.apache.helix.HelixAdmin; |
| import org.apache.helix.HelixDataAccessor; |
| import org.apache.helix.HelixException; |
| import org.apache.helix.HelixManager; |
| import org.apache.helix.PropertyKey; |
| import org.apache.helix.PropertyPathBuilder; |
| import org.apache.helix.SystemPropertyKeys; |
| import org.apache.helix.manager.zk.ZKHelixAdmin; |
| import org.apache.helix.manager.zk.ZKHelixDataAccessor; |
| import org.apache.helix.manager.zk.ZkBaseDataAccessor; |
| import org.apache.helix.model.ClusterConfig; |
| import org.apache.helix.model.InstanceConfig; |
| import org.apache.helix.model.LiveInstance; |
| import org.apache.helix.model.ResourceConfig; |
| import org.apache.helix.store.HelixPropertyStore; |
| import org.apache.helix.store.zk.ZkHelixPropertyStore; |
| import org.apache.helix.util.HelixUtil; |
| import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; |
| import org.apache.helix.zookeeper.datamodel.ZNRecord; |
| import org.apache.helix.zookeeper.zkclient.DataUpdater; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * CLI for scheduling/canceling workflows |
| */ |
| public class TaskDriver { |
| |
| public enum DriverCommand { |
| start, |
| stop, |
| delete, |
| resume, |
| list, |
| flush, |
| clean |
| } |
| |
| /** For logging */ |
| private static final Logger LOG = LoggerFactory.getLogger(TaskDriver.class); |
| |
| /** Default time out for monitoring workflow or job state */ |
| private final static int DEFAULT_TIMEOUT = 5 * 60 * 1000; /* 5 mins */ |
| |
| /** Default sleep time for requests */ |
| private final static long DEFAULT_SLEEP = 1000L; /* 1 second */ |
| |
| /** The illegal job states for job to accept new tasks */ |
| private final static Set<TaskState> ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION = new HashSet<>( |
| Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING, TaskState.FAILED, |
| TaskState.ABORTED, TaskState.COMPLETED, TaskState.STOPPING, TaskState.STOPPED)); |
| |
| // HELIX-619 This is a temporary solution for too many ZK nodes issue. |
| // Limit workflows/jobs creation to prevent the problem. |
| // |
| // Note this limitation should be smaller than ZK capacity. If current nodes count already exceeds |
| // the CAP, the verification method will not throw exception since the getChildNames() call will |
| // return empty list. |
| // |
| // TODO Implement or configure the limitation in ZK server. |
| private final static long DEFAULT_CONFIGS_LIMITATION = |
| HelixUtil.getSystemPropertyAsLong(SystemPropertyKeys.TASK_CONFIG_LIMITATION, 100000L); |
| private final static String TASK_START_TIME_KEY = "START_TIME"; |
| protected long _configsLimitation = DEFAULT_CONFIGS_LIMITATION; |
| |
| private final HelixDataAccessor _accessor; |
| private final HelixPropertyStore<ZNRecord> _propertyStore; |
| private final HelixAdmin _admin; |
| private final String _clusterName; |
| |
| public TaskDriver(HelixManager manager) { |
| this(manager.getClusterManagmentTool(), manager.getHelixDataAccessor(), |
| manager.getHelixPropertyStore(), manager.getClusterName()); |
| } |
| |
| @Deprecated |
| public TaskDriver(RealmAwareZkClient client, String clusterName) { |
| this(client, new ZkBaseDataAccessor<>(client), clusterName); |
| } |
| |
| @Deprecated |
| public TaskDriver(RealmAwareZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, |
| String clusterName) { |
| this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor), |
| new ZkHelixPropertyStore<>(baseAccessor, PropertyPathBuilder.propertyStore(clusterName), |
| null), |
| clusterName); |
| } |
| |
| @Deprecated |
| public TaskDriver(HelixAdmin admin, HelixDataAccessor accessor, ConfigAccessor cfgAccessor, |
| HelixPropertyStore<ZNRecord> propertyStore, String clusterName) { |
| this(admin, accessor, propertyStore, clusterName); |
| } |
| |
| public TaskDriver(HelixAdmin admin, HelixDataAccessor accessor, |
| HelixPropertyStore<ZNRecord> propertyStore, String clusterName) { |
| _admin = admin; |
| _accessor = accessor; |
| _propertyStore = propertyStore; |
| _clusterName = clusterName; |
| } |
| |
| /** |
| * Schedules a new workflow |
| * @param flow |
| */ |
| public void start(Workflow flow) { |
| LOG.info("Starting workflow {}", flow.getName()); |
| flow.validate(); |
| |
| validateZKNodeLimitation(flow.getJobConfigs().keySet().size() + 1); |
| |
| WorkflowConfig newWorkflowConfig = |
| new WorkflowConfig.Builder(flow.getWorkflowConfig()).setWorkflowId(flow.getName()).build(); |
| |
| Map<String, String> jobTypes = new HashMap<>(); |
| // add all job configs. |
| for (String job : flow.getJobConfigs().keySet()) { |
| JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(flow.getJobConfigs().get(job)); |
| if (flow.getTaskConfigs() != null && flow.getTaskConfigs().containsKey(job)) { |
| jobCfgBuilder.addTaskConfigs(flow.getTaskConfigs().get(job)); |
| } |
| JobConfig jobCfg = jobCfgBuilder.build(); |
| if (jobCfg.getJobType() != null) { |
| jobTypes.put(job, jobCfg.getJobType()); |
| } |
| addJobConfig(job, jobCfg); |
| } |
| newWorkflowConfig.setJobTypes(jobTypes); |
| |
| // add workflow config. |
| if (!TaskUtil.createWorkflowConfig(_accessor, flow.getName(), newWorkflowConfig)) { |
| // workflow config creation failed; try to delete job configs back |
| Set<String> failedJobRemoval = new HashSet<>(); |
| for (String job : flow.getJobConfigs().keySet()) { |
| if (!TaskUtil.removeJobConfig(_accessor, job)) { |
| failedJobRemoval.add(job); |
| } |
| } |
| throw new HelixException(String.format( |
| "Failed to add workflow configuration for workflow %s. It's possible that a workflow of the same name already exists or there was a connection issue. JobConfig deletion attempted but failed for the following jobs: %s", |
| flow.getName(), failedJobRemoval)); |
| } |
| } |
| |
| /** |
| * Update the configuration of a non-terminable workflow (queue). |
| * The terminable workflow's configuration is not allowed |
| * to change once created. |
| * Note: |
| * For recurrent workflow, the current running schedule will not be effected, |
| * the new configuration will be applied to the next scheduled runs of the workflow. |
| * For non-recurrent workflow, the new configuration may (or may not) be applied |
| * on the current running jobs, but it will be applied on the following unscheduled jobs. |
| * Example: |
| * _driver = new TaskDriver ... |
| * WorkflowConfig currentWorkflowConfig = _driver.getWorkflowCfg(_manager, workflow); |
| * WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(currentWorkflowConfig); |
| * // make needed changes to the config here |
| * configBuilder.setXXX(); |
| * // update workflow configuration |
| * _driver.updateWorkflow(workflow, configBuilder.build()); |
| * @param workflow |
| * @param newWorkflowConfig |
| */ |
| public void updateWorkflow(String workflow, WorkflowConfig newWorkflowConfig) { |
| if (newWorkflowConfig.getWorkflowId() == null || newWorkflowConfig.getWorkflowId().isEmpty()) { |
| newWorkflowConfig.getRecord() |
| .setSimpleField(WorkflowConfig.WorkflowConfigProperty.WorkflowID.name(), workflow); |
| } |
| if (workflow == null || !workflow.equals(newWorkflowConfig.getWorkflowId())) { |
| throw new HelixException(String.format( |
| "Workflow name {%s} does not match the workflow Id from WorkflowConfig {%s}", workflow, |
| newWorkflowConfig.getWorkflowId())); |
| } |
| |
| WorkflowConfig currentConfig = TaskUtil.getWorkflowConfig(_accessor, workflow); |
| if (currentConfig == null) { |
| throw new HelixException("Workflow " + workflow + " does not exist!"); |
| } |
| |
| if (currentConfig.isTerminable()) { |
| throw new HelixException( |
| "Workflow " + workflow + " is terminable, not allow to change its configuration!"); |
| } |
| |
| // Should not let user changing DAG in the workflow |
| newWorkflowConfig.setJobDag(currentConfig.getJobDag()); |
| if (!TaskUtil.setWorkflowConfig(_accessor, workflow, newWorkflowConfig)) { |
| LOG.error("Failed to update workflow configuration for workflow {}", workflow); |
| } |
| } |
| |
| /** |
| * Creates a new named job queue (workflow) |
| * @param queue |
| */ |
| public void createQueue(JobQueue queue) { |
| start(queue); |
| } |
| |
| /** |
| * Remove all completed or failed jobs in a job queue |
| * Same as {@link #cleanupQueue(String)} |
| * @param queue name of the queue |
| * @throws Exception |
| */ |
| public void flushQueue(String queue) { |
| cleanupQueue(queue); |
| } |
| |
| /** |
| * Delete a job from an existing named queue, |
| * the queue has to be stopped prior to this call |
| * @param queue queue name |
| * @param job job name, denamespaced |
| */ |
| public void deleteJob(final String queue, final String job) { |
| deleteNamespacedJob(queue, TaskUtil.getNamespacedJobName(queue, job), false); |
| } |
| |
| /** |
| * Delete a job from an existing named queue, |
| * the queue has to be stopped prior to this call |
| * @param queue queue name |
| * @param job job name, denamespaced |
| * @param forceDelete CAUTION: if set true, all job's related zk nodes will |
| * be clean up from zookeeper even if its workflow information can not be found. |
| */ |
| public void deleteJob(final String queue, final String job, boolean forceDelete) { |
| deleteNamespacedJob(queue, TaskUtil.getNamespacedJobName(queue, job), forceDelete); |
| } |
| |
| /** |
| * Delete a job from an existing named queue, |
| * the queue has to be stopped prior to this call |
| * @param queue queue name |
| * @param job job name: namespaced job name |
| * @param forceDelete CAUTION: if set true, all job's related zk nodes will |
| * be removed from zookeeper even if its JobQueue information can not be found. |
| */ |
| public void deleteNamespacedJob(final String queue, final String job, boolean forceDelete) { |
| WorkflowConfig jobQueueConfig = TaskUtil.getWorkflowConfig(_accessor, queue); |
| boolean isRecurringWorkflow; |
| |
| // Force deletion of a job |
| if (forceDelete) { |
| // remove all job-related ZNodes |
| LOG.info("Forcefully removing job: {} from queue: {}", job, queue); |
| if (!TaskUtil.removeJob(_accessor, _propertyStore, job)) { |
| LOG.info("Failed to delete job: {} from queue: {}", job, queue); |
| throw new HelixException("Failed to delete job: " + job + " from queue: " + queue); |
| } |
| // In case this was a recurrent workflow, remove it from last scheduled queue as well |
| if (jobQueueConfig != null) { |
| isRecurringWorkflow = jobQueueConfig.getScheduleConfig() != null |
| && jobQueueConfig.getScheduleConfig().isRecurring(); |
| if (isRecurringWorkflow) { |
| deleteJobFromLastScheduledQueue(queue, TaskUtil.getDenamespacedJobName(queue, job)); |
| } |
| } |
| return; |
| } |
| |
| // Regular, non-force, deletion of a job |
| if (jobQueueConfig == null) { |
| throw new IllegalArgumentException( |
| String.format("JobQueue %s's config is not found!", queue)); |
| } |
| if (!jobQueueConfig.isJobQueue()) { |
| throw new IllegalArgumentException(String.format("%s is not a queue!", queue)); |
| } |
| isRecurringWorkflow = jobQueueConfig.getScheduleConfig() != null |
| && jobQueueConfig.getScheduleConfig().isRecurring(); |
| String denamespacedJob = TaskUtil.getDenamespacedJobName(queue, job); |
| if (isRecurringWorkflow) { |
| deleteJobFromLastScheduledQueue(queue, denamespacedJob); |
| } |
| deleteJobFromQueue(queue, denamespacedJob); |
| } |
| |
| /** |
| * Delete the given job from the last-scheduled queue for recurrent workflows. |
| * @param queue |
| * @param denamespacedJob |
| */ |
| private void deleteJobFromLastScheduledQueue(String queue, String denamespacedJob) { |
| // delete job from the last scheduled queue if there exists one. |
| WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queue); |
| String lastScheduledQueue = null; |
| if (wCtx != null) { |
| lastScheduledQueue = wCtx.getLastScheduledSingleWorkflow(); |
| } |
| if (lastScheduledQueue != null) { |
| WorkflowConfig lastWorkflowCfg = TaskUtil.getWorkflowConfig(_accessor, lastScheduledQueue); |
| if (lastWorkflowCfg != null) { |
| deleteJobFromQueue(lastScheduledQueue, denamespacedJob); |
| } |
| } |
| } |
| |
| /** |
| * Delete a job from a scheduled (non-recurrent) queue. |
| * @param queue |
| * @param job this must be a namespaced job name |
| */ |
| private void deleteJobFromQueue(final String queue, final String job) { |
| WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_propertyStore, queue); |
| String workflowState = (workflowCtx != null) ? workflowCtx.getWorkflowState().name() |
| : TaskState.NOT_STARTED.name(); |
| |
| if (workflowState.equals(TaskState.IN_PROGRESS.name())) { |
| throw new IllegalStateException("Queue " + queue + " is still running!"); |
| } |
| |
| if (workflowState.equals(TaskState.COMPLETED.name()) |
| || workflowState.equals(TaskState.FAILED.name()) |
| || workflowState.equals(TaskState.ABORTED.name())) { |
| LOG.warn("Queue {} has already reached its final state, skip deleting job from it.", queue); |
| return; |
| } |
| |
| if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, |
| Collections.singleton(TaskUtil.getNamespacedJobName(queue, job)), true)) { |
| LOG.error("Failed to delete job {} from queue {}.", job, queue); |
| throw new HelixException("Failed to delete job " + job + " from queue " + queue); |
| } |
| } |
| |
| /** |
| * Adds a new job to the end an existing named queue. |
| * @param queue |
| * @param job |
| * @param jobBuilder |
| * @throws Exception |
| */ |
| public void enqueueJob(final String queue, final String job, JobConfig.Builder jobBuilder) { |
| enqueueJobs(queue, Collections.singletonList(job), Collections.singletonList(jobBuilder)); |
| } |
| |
| /** |
| * Batch add jobs to queues that garantee |
| * @param queue |
| * @param jobs |
| * @param jobBuilders |
| */ |
| public void enqueueJobs(final String queue, final List<String> jobs, |
| final List<JobConfig.Builder> jobBuilders) { |
| |
| // Get the job queue config and capacity |
| WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue); |
| if (workflowConfig == null) { |
| throw new IllegalArgumentException("Queue " + queue + " config does not yet exist!"); |
| } |
| if (workflowConfig.isTerminable()) { |
| throw new IllegalArgumentException(queue + " is not a queue!"); |
| } |
| |
| final int capacity = workflowConfig.getCapacity(); |
| int queueSize = workflowConfig.getJobDag().size(); |
| if (capacity > 0 && queueSize >= capacity) { |
| // if queue is full, Helix will try to clean up the expired job to free more space. |
| WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_propertyStore, queue); |
| if (workflowContext != null) { |
| Set<String> expiredJobs = |
| TaskUtil.getExpiredJobs(_accessor, _propertyStore, workflowConfig, workflowContext); |
| if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, expiredJobs, true)) { |
| LOG.warn("Failed to clean up expired and completed jobs from queue {}", queue); |
| } |
| } |
| workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue); |
| if (workflowConfig.getJobDag().size() >= capacity) { |
| throw new HelixException(String.format( "Failed to enqueue job, queue %s is full.", queue)); |
| } |
| } |
| |
| // Fail the operation if adding new jobs will cause the queue to reach its capacity limit |
| workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue); |
| if (workflowConfig.getJobDag().size() + jobs.size() >= capacity) { |
| throw new IllegalStateException( |
| String.format("Queue %s already reaches its max capacity %d, failed to add %s", queue, |
| capacity, jobs.toString())); |
| } |
| |
| validateZKNodeLimitation(1); |
| final List<JobConfig> jobConfigs = new ArrayList<>(); |
| final List<String> namespacedJobNames = new ArrayList<>(); |
| final List<String> jobTypeList = new ArrayList<>(); |
| |
| try { |
| for (int i = 0; i < jobBuilders.size(); i++) { |
| // Create the job to ensure that it validates |
| JobConfig jobConfig = jobBuilders.get(i).setWorkflow(queue).build(); |
| String namespacedJobName = TaskUtil.getNamespacedJobName(queue, jobs.get(i)); |
| |
| // add job config first. |
| addJobConfig(namespacedJobName, jobConfig); |
| jobConfigs.add(jobConfig); |
| namespacedJobNames.add(namespacedJobName); |
| jobTypeList.add(jobConfig.getJobType()); |
| } |
| } catch (HelixException e) { |
| LOG.error("Failed to add job configs {}. Remove them all!", jobs.toString()); |
| for (String job : jobs) { |
| String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job); |
| TaskUtil.removeJobConfig(_accessor, namespacedJobName); |
| } |
| } |
| |
| // update the job dag to append the job to the end of the queue. |
| DataUpdater<ZNRecord> updater = currentData -> { |
| if (currentData == null) { |
| // For some reason, the WorkflowConfig for this JobQueue doesn't exist |
| // In this case, we cannot proceed and must alert the user |
| throw new HelixException( |
| String.format("enqueueJobs DataUpdater: JobQueue %s config is not found!", queue)); |
| } |
| |
| // Add the node to the existing DAG |
| JobDag jobDag = JobDag |
| .fromJson(currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name())); |
| Set<String> allNodes = jobDag.getAllNodes(); |
| if (capacity > 0 && allNodes.size() + jobConfigs.size() >= capacity) { |
| // Remove previously added jobConfigs if adding new jobs will cause exceeding capacity |
| // limit. Removing the job configs is necessary to avoid multiple threads adding jobs at the |
| // same time and cause overcapacity queue |
| for (String job : jobs) { |
| String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job); |
| TaskUtil.removeJobConfig(_accessor, namespacedJobName); |
| } |
| throw new IllegalStateException( |
| String.format("Queue %s already reaches its max capacity %d, failed to add %s", queue, |
| capacity, jobs.toString())); |
| } |
| |
| String lastNodeName = null; |
| for (int i = 0; i < namespacedJobNames.size(); i++) { |
| String namespacedJobName = namespacedJobNames.get(i); |
| if (allNodes.contains(namespacedJobName)) { |
| throw new IllegalStateException(String |
| .format("Could not add to queue %s, job %s already exists", queue, jobs.get(i))); |
| } |
| jobDag.addNode(namespacedJobName); |
| |
| // Add the node to the end of the queue |
| String candidate = null; |
| if (lastNodeName == null) { |
| for (String node : allNodes) { |
| if (!node.equals(namespacedJobName) && jobDag.getDirectChildren(node).isEmpty()) { |
| candidate = node; |
| break; |
| } |
| } |
| } else { |
| candidate = lastNodeName; |
| } |
| if (candidate != null) { |
| jobDag.addParentToChild(candidate, namespacedJobName); |
| lastNodeName = namespacedJobName; |
| } |
| } |
| |
| // Add job type if job type is not null |
| Map<String, String> jobTypes = |
| currentData.getMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name()); |
| for (String jobType : jobTypeList) { |
| if (jobType != null) { |
| if (jobTypes == null) { |
| jobTypes = new HashMap<>(); |
| } |
| jobTypes.put(queue, jobType); |
| } |
| } |
| |
| if (jobTypes != null) { |
| currentData.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), jobTypes); |
| } |
| // Save the updated DAG |
| try { |
| currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), |
| jobDag.toJson()); |
| } catch (Exception e) { |
| throw new IllegalStateException( |
| String.format("Could not add jobs %s to queue %s", jobs.toString(), queue), e); |
| } |
| return currentData; |
| }; |
| |
| String path = _accessor.keyBuilder().resourceConfig(queue).getPath(); |
| boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT); |
| if (!status) { |
| LOG.error("Failed to update WorkflowConfig, remove all jobs {}", jobs.toString()); |
| for (String job : jobs) { |
| TaskUtil.removeJobConfig(_accessor, job); |
| } |
| throw new HelixException("Failed to enqueue job"); |
| } |
| } |
| |
| /** |
| * Add task to a running (IN-PROGRESS) job or a job which has not started yet. Timeout for this |
| * operation is the default timeout which is 5 minutes. {@link TaskDriver#DEFAULT_TIMEOUT} |
| * Note1: Task cannot be added if the job is in an illegal state. A job can accept |
| * new task if the job is in-progress or it has not started yet. |
| * Note2: The tasks can only be added to non-targeted jobs. |
| * Note3: The taskID for the new task should be unique. If not, this API throws an exception. |
| * Note4: In case of timeout exception, it is the user's responsibility to check whether the task |
| * has been successfully added or not. |
| * Note5: If there is a delay in scheduling the tasks this API may fail. |
| * @param workflowName |
| * @param jobName |
| * @param taskConfig |
| * @throws TimeoutException if the outcome of the task addition is unknown and cannot be verified |
| * @throws IllegalArgumentException if the inputs are invalid |
| * @throws HelixException if the job is not in the states to accept a new task or if there is any |
| * issue in updating jobConfig. |
| */ |
| public void addTask(String workflowName, String jobName, TaskConfig taskConfig) |
| throws TimeoutException, InterruptedException { |
| addTask(workflowName, jobName, taskConfig, DEFAULT_TIMEOUT); |
| } |
| |
| /** |
| * Add task to a running (IN-PROGRESS) job or a job which has not started yet |
| * Note1: Task cannot be added if the job is in an illegal state. A job can accept |
| * new task if the job is in-progress or it has not started yet. |
| * Note2: The tasks can only be added to non-targeted jobs. |
| * Note3: The taskID for the new task should be unique. If not, this API throws an exception. |
| * Note4: In case of timeout exception, it is the user's responsibility to check whether the task |
| * has been successfully added or not. |
| * Note5: timeout is the time that this API checks whether the task has been successfully added or |
| * not. |
| * Note6: If there is a delay in scheduling the tasks this API may fail. |
| * @param workflowName |
| * @param jobName |
| * @param taskConfig |
| * @param timeoutMs |
| * @throws TimeoutException if the outcome of the task addition is unknown and cannot be verified |
| * @throws IllegalArgumentException if the inputs are invalid |
| * @throws HelixException if the job is not in the states to accept a new task or if there is any |
| * issue in updating jobConfig. |
| */ |
| public void addTask(String workflowName, String jobName, TaskConfig taskConfig, long timeoutMs) |
| throws TimeoutException, InterruptedException { |
| |
| if (timeoutMs < DEFAULT_SLEEP) { |
| throw new IllegalArgumentException( |
| String.format("Timeout is less than the minimum acceptable timeout value which is %s ms", |
| DEFAULT_SLEEP)); |
| } |
| |
| long endTime = System.currentTimeMillis() + timeoutMs; |
| |
| validateConfigsForTaskModifications(workflowName, jobName, taskConfig); |
| |
| String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName); |
| JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName); |
| for (String taskEntry : jobConfig.getMapConfigs().keySet()) { |
| if (taskEntry.equals(taskConfig.getId())) { |
| throw new HelixException( |
| "Task cannot be added because another task with the same ID already exists!"); |
| } |
| } |
| |
| WorkflowContext workflowContext = getWorkflowContext(workflowName); |
| JobContext jobContext = getJobContext(nameSpaceJobName); |
| // If workflow context or job context is null. It means job has not been started. Hence task can |
| // be added to the job |
| if (workflowContext != null && jobContext != null) { |
| TaskState jobState = workflowContext.getJobState(nameSpaceJobName); |
| if (jobState != null && ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) { |
| throw new HelixException("Job " + nameSpaceJobName |
| + " is in illegal state for task addition. Job State is " + jobState); |
| } |
| } |
| |
| DataUpdater<ZNRecord> updater = currentData -> { |
| if (currentData != null) { |
| currentData.setMapField(taskConfig.getId(), taskConfig.getConfigMap()); |
| } else { |
| LOG.error("JobConfig DataUpdater: Fails to update JobConfig. CurrentData is null."); |
| } |
| return currentData; |
| }; |
| |
| updateTaskInJobConfig(workflowName, jobName, updater); |
| |
| workflowContext = |
| _accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName)); |
| jobContext = |
| _accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName, jobName)); |
| |
| if (workflowContext == null || jobContext == null) { |
| return; |
| } |
| |
| String taskID = taskConfig.getId(); |
| while (System.currentTimeMillis() <= endTime) { |
| jobContext = |
| _accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName, jobName)); |
| workflowContext = |
| _accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName)); |
| if (jobContext.getTaskIdPartitionMap().containsKey(taskID) |
| && workflowContext.getJobState(nameSpaceJobName) == TaskState.IN_PROGRESS) { |
| return; |
| } |
| |
| Thread.sleep(DEFAULT_SLEEP); |
| } |
| throw new TimeoutException("An unexpected issue happened while task being added to the job!"); |
| } |
| |
| /** |
| * Delete an existing task from a running (IN-PROGRESS) job or a job which has not started yet. |
| * Timeout for this operation is the default timeout which is 5 minutes. |
| * {@link TaskDriver#DEFAULT_TIMEOUT} |
| * Note1: Task cannot be deleted from the job which is in an illegal state. Task can be deleted |
| * from the job if the job is in-progress or it has not started yet. |
| * Note2: The tasks can only be deleted from non-targeted jobs. |
| * Note3: In case of timeout exception, it is the user's responsibility to check whether the task |
| * has been successfully deleted or not. |
| * Note4: timeout is the time that this API checks whether the task has been successfully deleted |
| * or not. |
| * @param workflowName |
| * @param jobName |
| * @param taskID |
| * @throws TimeoutException if the outcome of the task deletion is unknown and cannot be verified |
| * @throws IllegalArgumentException if the inputs are invalid |
| * @throws HelixException if the job is not in the states to accept a new task or if there is any |
| * issue in updating jobConfig. |
| */ |
| public void deleteTask(String workflowName, String jobName, String taskID) |
| throws TimeoutException, InterruptedException { |
| deleteTask(workflowName, jobName, taskID, DEFAULT_TIMEOUT); |
| } |
| |
| /** |
| * Delete an existing task from a running (IN-PROGRESS) job or a job which has not started yet. |
| * Note1: Task cannot be deleted from the job which is in an illegal state. Task can be deleted |
| * from the job if the job is in-progress or it has not started yet. |
| * Note2: The tasks can only be deleted from non-targeted jobs. |
| * Note3: In case of timeout exception, it is the user's responsibility to check whether the task |
| * has been successfully deleted or not. |
| * Note4: timeout is the time that this API checks whether the task has been successfully deleted |
| * or not. |
| * @param workflowName |
| * @param jobName |
| * @param taskID |
| * @param timeoutMs |
| * @throws TimeoutException if the outcome of the task deletion is unknown and cannot be verified |
| * @throws IllegalArgumentException if the inputs are invalid |
| * @throws HelixException if the job is not in the states to accept a new task or if there is any |
| * issue in updating jobConfig. |
| */ |
| public void deleteTask(String workflowName, String jobName, String taskID, long timeoutMs) |
| throws TimeoutException, InterruptedException { |
| long endTime = System.currentTimeMillis() + timeoutMs; |
| |
| String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName); |
| JobConfig jobConfig = getJobConfig(nameSpaceJobName); |
| if (jobConfig == null) { |
| throw new IllegalArgumentException("Job " + nameSpaceJobName + " config does not exist!"); |
| } |
| |
| TaskConfig taskConfig = null; |
| Map<String, TaskConfig> allTaskConfigs = jobConfig.getTaskConfigMap(); |
| for (Map.Entry<String, TaskConfig> entry : allTaskConfigs.entrySet()) { |
| if (entry.getKey().equals(taskID)) { |
| taskConfig = entry.getValue(); |
| } |
| } |
| |
| validateConfigsForTaskModifications(workflowName, jobName, taskConfig); |
| |
| WorkflowContext workflowContext = getWorkflowContext(workflowName); |
| JobContext jobContext = getJobContext(nameSpaceJobName); |
| // If workflow context or job context is null. It means job has not been started. Hence task can |
| // be deleted from the job |
| if (workflowContext != null && jobContext != null) { |
| TaskState jobState = workflowContext.getJobState(nameSpaceJobName); |
| if (jobState != null && ILLEGAL_JOB_STATES_FOR_TASK_MODIFICATION.contains(jobState)) { |
| throw new HelixException("Job " + nameSpaceJobName |
| + " is in illegal state for task deletion. Job State is " + jobState); |
| } |
| } |
| |
| DataUpdater<ZNRecord> taskRemover = new DataUpdater<ZNRecord>() { |
| @Override |
| public ZNRecord update(ZNRecord currentData) { |
| if (currentData != null) { |
| Map<String, Map<String, String>> taskMap = currentData.getMapFields(); |
| if (taskMap == null) { |
| LOG.warn("Could not update the jobConfig: {} Znode MapField is null.", jobName ); |
| return null; |
| } |
| Map<String, Map<String, String>> newTaskMap = new HashMap<String, Map<String, String>>(); |
| for (Map.Entry<String, Map<String, String>> entry : taskMap.entrySet()) { |
| if (!entry.getKey().equals(taskID)) { |
| newTaskMap.put(entry.getKey(), entry.getValue()); |
| } |
| } |
| currentData.setMapFields(newTaskMap); |
| } |
| return currentData; |
| } |
| }; |
| |
| updateTaskInJobConfig(workflowName, jobName, taskRemover); |
| |
| workflowContext = |
| _accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName)); |
| jobContext = |
| _accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName, jobName)); |
| |
| if (workflowContext == null || jobContext == null) { |
| return; |
| } |
| |
| while (System.currentTimeMillis() <= endTime) { |
| jobContext = |
| _accessor.getProperty(_accessor.keyBuilder().jobContextZNode(workflowName, jobName)); |
| workflowContext = |
| _accessor.getProperty(_accessor.keyBuilder().workflowContextZNode(workflowName)); |
| if (!jobContext.getTaskIdPartitionMap().containsKey(taskID)) { |
| return; |
| } |
| Thread.sleep(DEFAULT_SLEEP); |
| } |
| throw new TimeoutException( |
| "An unexpected issue happened while task being deleted from the job!"); |
| } |
| |
| /** |
| * The helper method which check the workflow, job and task configs to determine if new task can |
| * be added/deleted to/from the job |
| * @param workflowName |
| * @param jobName |
| * @param taskConfig |
| */ |
| private void validateConfigsForTaskModifications(String workflowName, String jobName, |
| TaskConfig taskConfig) { |
| WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflowName); |
| String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName); |
| JobConfig jobConfig = TaskUtil.getJobConfig(_accessor, nameSpaceJobName); |
| |
| if (workflowConfig == null) { |
| throw new IllegalArgumentException( |
| String.format("Workflow config for workflow %s does not exist!", workflowName)); |
| } |
| |
| if (jobConfig == null) { |
| throw new IllegalArgumentException( |
| String.format("Job config for job %s does not exist!", nameSpaceJobName)); |
| } |
| |
| if (taskConfig == null) { |
| throw new IllegalArgumentException("TaskConfig is null!"); |
| } |
| |
| if (taskConfig.getId() == null) { |
| throw new HelixException("Task cannot be added or deleted because taskID is null!"); |
| } |
| |
| if (jobConfig.getTargetResource() != null) { |
| throw new HelixException(String.format( |
| "Job %s is a targeted job. New task cannot be added/deleted to/from this job!", |
| nameSpaceJobName)); |
| } |
| |
| if ((taskConfig.getCommand() == null) == (jobConfig.getCommand() == null)) { |
| throw new HelixException(String |
| .format("Command must exist in either jobconfig (%s) or taskconfig (%s), not both!", jobName, |
| taskConfig.getId())); |
| } |
| } |
| |
| /** |
| * A helper method which updates the tasks within a the job config. |
| * @param workflowName |
| * @param jobName |
| * @param updater |
| */ |
| private void updateTaskInJobConfig(String workflowName, String jobName, |
| DataUpdater<ZNRecord> updater) { |
| String nameSpaceJobName = TaskUtil.getNamespacedJobName(workflowName, jobName); |
| String path = _accessor.keyBuilder().resourceConfig(nameSpaceJobName).getPath(); |
| boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT); |
| if (!status) { |
| LOG.error("Failed to update task in the job {}", nameSpaceJobName); |
| throw new HelixException("Failed to update task in the job"); |
| } |
| } |
| |
| /** |
| * Keep the old name of API for backward compatibility |
| * @param queue |
| */ |
| @Deprecated |
| public void cleanupJobQueue(String queue) { |
| cleanupQueue(queue); |
| } |
| |
| /** |
| * Remove all jobs that are in final states (ABORTED, FAILED, COMPLETED) from the job queue. The |
| * job config, job context will be removed from Zookeeper. |
| * @param queue The name of job queue |
| */ |
| public void cleanupQueue(String queue) { |
| WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue); |
| |
| if (workflowConfig == null) { |
| throw new IllegalArgumentException("Queue " + queue + " does not yet exist!"); |
| } |
| |
| if (!workflowConfig.isJobQueue() || workflowConfig.isTerminable()) { |
| throw new IllegalArgumentException(queue + " is not a queue!"); |
| } |
| |
| WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, queue); |
| if (wCtx == null || wCtx.getWorkflowState() == null) { |
| throw new IllegalStateException("Queue " + queue + " does not have a valid work state!"); |
| } |
| |
| Set<String> jobs = new HashSet<>(); |
| for (String jobNode : workflowConfig.getJobDag().getAllNodes()) { |
| TaskState curState = wCtx.getJobState(jobNode); |
| if (curState != null && curState == TaskState.ABORTED || curState == TaskState.COMPLETED |
| || curState == TaskState.FAILED) { |
| jobs.add(jobNode); |
| } |
| } |
| |
| TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, jobs, true); |
| } |
| |
| /** |
| * Add new job config to cluster by way of create |
| */ |
| private void addJobConfig(String job, JobConfig jobConfig) { |
| LOG.info("Add job configuration " + job); |
| |
| // Set the job configuration |
| JobConfig newJobCfg = new JobConfig(job, jobConfig); |
| if (!TaskUtil.createJobConfig(_accessor, job, newJobCfg)) { |
| throw new HelixException("Failed to add job configuration for job " + job |
| + ". It's possible that a job of the same name already exists or there was a connection issue"); |
| } |
| } |
| |
| /** |
| * Public method to resume a workflow/queue. |
| * @param workflow |
| */ |
| public void resume(String workflow) { |
| setWorkflowTargetState(workflow, TargetState.START); |
| } |
| |
| /** |
| * Public async method to stop a workflow/queue. |
| * This call only send STOP command to Helix, it does not check |
| * whether the workflow (all jobs) has been stopped yet. |
| * @param workflow |
| */ |
| public void stop(String workflow) { |
| setWorkflowTargetState(workflow, TargetState.STOP); |
| } |
| |
| /** |
| * Public sync method to stop a workflow/queue with timeout |
| * Basically the workflow and all of its jobs has been stopped if this method return success. |
| * @param workflow The workflow name |
| * @param timeout The timeout for stopping workflow/queue in milisecond |
| */ |
| public void waitToStop(String workflow, long timeout) throws InterruptedException { |
| setWorkflowTargetState(workflow, TargetState.STOP); |
| long endTime = System.currentTimeMillis() + timeout; |
| |
| while (System.currentTimeMillis() <= endTime) { |
| WorkflowContext workflowContext = getWorkflowContext(workflow); |
| |
| if (workflowContext == null |
| || !TaskState.STOPPED.equals(workflowContext.getWorkflowState())) { |
| Thread.sleep(DEFAULT_SLEEP); |
| } else { |
| // Successfully stopped |
| return; |
| } |
| } |
| |
| // Failed to stop with timeout |
| throw new HelixException(String |
| .format("Fail to stop the workflow/queue %s with in %d milliseconds.", workflow, timeout)); |
| } |
| |
| /** |
| * Public method to delete a workflow/queue. |
| * @param workflow |
| */ |
| public void delete(String workflow) { |
| delete(workflow, false); |
| } |
| |
| /** |
| * Public method to delete a workflow/queue. |
| * @param workflow |
| * @param forceDelete, CAUTION: if set true, workflow and all of its jobs' related zk nodes will |
| * be clean up immediately from zookeeper, no matter whether there are jobs |
| * are running or not. |
| * Enabling this option can cause a ZooKeeper delete failure as Helix might |
| * inadvertently try to write the deleted ZNodes back to ZooKeeper. Also this option |
| * might corrupt Task Framework cache. At any rate, it shouldn't be used while the |
| * controller is running or the cluster is active. |
| */ |
| public void delete(String workflow, boolean forceDelete) { |
| WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflow); |
| if (forceDelete) { |
| // if forceDelete, remove the workflow and its jobs immediately from zookeeper. |
| LOG.info("Forcefully removing workflow: " + workflow); |
| removeWorkflowFromZK(workflow); |
| } else { |
| // Set the workflow target state to DELETE, and let Helix controller to remove the workflow. |
| // Controller may remove the workflow instantly, so record context before set DELETE state. |
| setWorkflowTargetState(workflow, TargetState.DELETE); |
| } |
| |
| // Delete all previously scheduled workflows. |
| if (wCtx != null && wCtx.getScheduledWorkflows() != null) { |
| for (String scheduledWorkflow : wCtx.getScheduledWorkflows()) { |
| if (forceDelete) { |
| // Only directly delete it if it's force delete. Otherwise, it will cause a race condition |
| // where contexts would get written back to ZK from cache |
| WorkflowContext scheduledWorkflowCtx = |
| TaskUtil.getWorkflowContext(_propertyStore, scheduledWorkflow); |
| if (scheduledWorkflowCtx != null |
| && scheduledWorkflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) { |
| removeWorkflowFromZK(scheduledWorkflow); |
| } |
| } else { |
| setWorkflowTargetState(scheduledWorkflow, TargetState.DELETE); |
| } |
| } |
| } |
| } |
| |
| private void removeWorkflowFromZK(String workflow) { |
| Set<String> jobSet = new HashSet<>(); |
| // Note that even WorkflowConfig is null, if WorkflowContext exists, still need to remove |
| // workflow |
| WorkflowConfig wCfg = TaskUtil.getWorkflowConfig(_accessor, workflow); |
| if (wCfg != null) { |
| jobSet.addAll(wCfg.getJobDag().getAllNodes()); |
| } |
| boolean success = TaskUtil.removeWorkflow(_accessor, _propertyStore, workflow, jobSet); |
| if (!success) { |
| LOG.info("Failed to delete the workflow " + workflow); |
| throw new HelixException("Failed to delete the workflow " + workflow); |
| } |
| } |
| |
| /** |
| * TODO: IdealStates are no longer used by Task Framework. This function deletes IdealStates for |
| * TODO: backward compatability purpose; this behavior will be removed later. |
| * Public synchronized method to wait for a delete operation to fully complete with timeout. |
| * When this method returns, it means that a queue (workflow) has been completely deleted, meaning |
| * its IdealState, WorkflowConfig, and WorkflowContext have all been deleted. |
| * @param workflow workflow/jobqueue name |
| * @param timeout duration to give to delete operation to completion |
| */ |
| public void deleteAndWaitForCompletion(String workflow, long timeout) |
| throws InterruptedException { |
| delete(workflow); |
| long endTime = System.currentTimeMillis() + timeout; |
| |
| // For checking whether delete completed |
| BaseDataAccessor baseDataAccessor = _accessor.getBaseDataAccessor(); |
| PropertyKey.Builder keyBuilder = _accessor.keyBuilder(); |
| |
| String idealStatePath = keyBuilder.idealStates(workflow).getPath(); |
| String workflowConfigPath = keyBuilder.resourceConfig(workflow).getPath(); |
| String workflowContextPath = keyBuilder.workflowContext(workflow).getPath(); |
| |
| while (System.currentTimeMillis() <= endTime) { |
| if (baseDataAccessor.exists(idealStatePath, AccessOption.PERSISTENT) |
| || baseDataAccessor.exists(workflowConfigPath, AccessOption.PERSISTENT) |
| || baseDataAccessor.exists(workflowContextPath, AccessOption.PERSISTENT)) { |
| Thread.sleep(DEFAULT_SLEEP); |
| } else { |
| return; |
| } |
| } |
| |
| // Deletion failed: check which step of deletion failed to complete and create an error message |
| StringBuilder failed = new StringBuilder(); |
| if (baseDataAccessor.exists(idealStatePath, AccessOption.PERSISTENT)) { |
| failed.append("IdealState "); |
| } |
| if (baseDataAccessor.exists(workflowConfigPath, AccessOption.PERSISTENT)) { |
| failed.append("WorkflowConfig "); |
| } |
| if (baseDataAccessor.exists(workflowContextPath, AccessOption.PERSISTENT)) { |
| failed.append("WorkflowContext "); |
| } |
| throw new HelixException( |
| String.format( |
| "Failed to delete the workflow/queue %s within %d milliseconds. " |
| + "The following components still remain: %s", |
| workflow, timeout, failed.toString())); |
| } |
| |
| /** |
| * Helper function to change target state for a given workflow |
| */ |
| private void setWorkflowTargetState(String workflow, TargetState state) { |
| setSingleWorkflowTargetState(workflow, state); |
| |
| // For recurring schedules, last scheduled incomplete workflow must also be handled |
| WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflow); |
| if (wCtx != null) { |
| String lastScheduledWorkflow = wCtx.getLastScheduledSingleWorkflow(); |
| if (lastScheduledWorkflow != null) { |
| setSingleWorkflowTargetState(lastScheduledWorkflow, state); |
| } |
| } |
| } |
| |
| /** |
| * Helper function to change target state for a given workflow |
| */ |
| private void setSingleWorkflowTargetState(String workflow, final TargetState state) { |
| LOG.info("Set " + workflow + " to target state " + state); |
| |
| WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, workflow); |
| if (workflowConfig == null) { |
| LOG.warn("WorkflowConfig for {} not found!", workflow); |
| return; |
| } |
| |
| WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_propertyStore, workflow); |
| if (state != TargetState.DELETE && workflowContext != null |
| && workflowContext.getFinishTime() != WorkflowContext.UNFINISHED) { |
| // Should not update target state for completed workflow |
| LOG.info("Workflow {} is already completed, skip to update its target state {}", workflow, |
| state); |
| return; |
| } |
| |
| DataUpdater<ZNRecord> updater = currentData -> { |
| if (currentData != null) { |
| currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(), |
| state.name()); |
| } else { |
| LOG.warn( |
| "TargetState DataUpdater: Fails to update target state for {}. CurrentData is null.", |
| workflow); |
| } |
| return currentData; |
| }; |
| |
| PropertyKey workflowConfigKey = TaskUtil.getWorkflowConfigKey(_accessor, workflow); |
| _accessor.getBaseDataAccessor().update(workflowConfigKey.getPath(), updater, |
| AccessOption.PERSISTENT); |
| } |
| |
| public WorkflowConfig getWorkflowConfig(String workflow) { |
| return TaskUtil.getWorkflowConfig(_accessor, workflow); |
| } |
| |
| public WorkflowContext getWorkflowContext(String workflow) { |
| return TaskUtil.getWorkflowContext(_propertyStore, workflow); |
| } |
| |
| public JobConfig getJobConfig(String job) { |
| return TaskUtil.getJobConfig(_accessor, job); |
| } |
| |
| public JobContext getJobContext(String job) { |
| return TaskUtil.getJobContext(_propertyStore, job); |
| } |
| |
| public static JobContext getJobContext(HelixManager manager, String job) { |
| return TaskUtil.getJobContext(manager, job); |
| } |
| |
| public static WorkflowConfig getWorkflowConfig(HelixManager manager, String workflow) { |
| return TaskUtil.getWorkflowConfig(manager, workflow); |
| } |
| |
| public static WorkflowContext getWorkflowContext(HelixManager manager, String workflow) { |
| return TaskUtil.getWorkflowContext(manager, workflow); |
| } |
| |
| public static JobConfig getJobConfig(HelixManager manager, String job) { |
| return TaskUtil.getJobConfig(manager, job); |
| } |
| |
| /** |
| * Batch get the configurations of all workflows in this cluster. |
| * @return |
| */ |
| public Map<String, WorkflowConfig> getWorkflows() { |
| Map<String, WorkflowConfig> workflowConfigMap = new HashMap<>(); |
| Map<String, ResourceConfig> resourceConfigMap = |
| _accessor.getChildValuesMap(_accessor.keyBuilder().resourceConfigs(), true); |
| |
| for (Map.Entry<String, ResourceConfig> resource : resourceConfigMap.entrySet()) { |
| try { |
| WorkflowConfig config = WorkflowConfig.fromHelixProperty(resource.getValue()); |
| workflowConfigMap.put(resource.getKey(), config); |
| } catch (IllegalArgumentException ex) { |
| // ignore if it is not a workflow config. |
| } |
| } |
| |
| return workflowConfigMap; |
| } |
| |
| /** |
| * This call will be blocked until either workflow reaches to one of the state specified |
| * in the arguments, or timeout happens. If timeout happens, then it will throw a HelixException |
| * Otherwise, it will return current workflow state |
| * @param workflowName The workflow to be monitored |
| * @param timeout A long integer presents the time out, in milliseconds |
| * @param targetStates Specified states that user would like to stop monitoring |
| * @return A TaskState, which is current workflow state |
| * @throws InterruptedException |
| */ |
| public TaskState pollForWorkflowState(String workflowName, long timeout, |
| TaskState... targetStates) throws InterruptedException { |
| // Wait for completion. |
| long st = System.currentTimeMillis(); |
| WorkflowContext ctx; |
| Set<TaskState> allowedStates = new HashSet<>(Arrays.asList(targetStates)); |
| |
| long timeToSleep = timeout > 100L ? 100L : timeout; |
| do { |
| Thread.sleep(timeToSleep); |
| ctx = getWorkflowContext(workflowName); |
| } while ((ctx == null || ctx.getWorkflowState() == null |
| || !allowedStates.contains(ctx.getWorkflowState())) |
| && System.currentTimeMillis() < st + timeout); |
| |
| if (ctx == null || !allowedStates.contains(ctx.getWorkflowState())) { |
| throw new HelixException(String.format( |
| "Workflow %s context is empty or not in states: %s, current state: %s.", |
| workflowName, Arrays.asList(targetStates), |
| ctx == null ? "null" : ctx.getWorkflowState().toString())); |
| } |
| |
| return ctx.getWorkflowState(); |
| } |
| |
| /** |
| * This is a wrapper function that set default time out for monitoring workflow in 2 MINUTES. |
| * If timeout happens, then it will throw a HelixException, Otherwise, it will return |
| * current job state. |
| * @param workflowName The workflow to be monitored |
| * @param targetStates Specified states that user would like to stop monitoring |
| * @return A TaskState, which is current workflow state |
| * @throws InterruptedException |
| */ |
| public TaskState pollForWorkflowState(String workflowName, TaskState... targetStates) |
| throws InterruptedException { |
| return pollForWorkflowState(workflowName, DEFAULT_TIMEOUT, targetStates); |
| } |
| |
| /** |
| * This call will be blocked until either specified job reaches to one of the state |
| * in the arguments, or timeout happens. If timeout happens, then it will throw a HelixException |
| * Otherwise, it will return current job state |
| * @param workflowName The workflow that contains the job to monitor |
| * @param jobName The specified job to monitor |
| * @param timeout A long integer presents the time out, in milliseconds |
| * @param states Specified states that user would like to stop monitoring |
| * @return A TaskState, which is current job state |
| * @throws Exception |
| */ |
| public TaskState pollForJobState(String workflowName, String jobName, long timeout, |
| TaskState... states) throws InterruptedException { |
| // Get workflow config |
| WorkflowConfig workflowConfig = getWorkflowConfig(workflowName); |
| |
| if (workflowConfig == null) { |
| throw new HelixException(String.format("Workflow %s does not exists!", workflowName)); |
| } |
| |
| long timeToSleep = timeout > 50L ? 50L : timeout; |
| |
| WorkflowContext ctx; |
| if (workflowConfig.isRecurring()) { |
| // if it's recurring, need to reconstruct workflow and job name |
| do { |
| Thread.sleep(timeToSleep); |
| ctx = getWorkflowContext(workflowName); |
| } while ((ctx == null || ctx.getLastScheduledSingleWorkflow() == null)); |
| |
| jobName = jobName.substring(workflowName.length() + 1); |
| workflowName = ctx.getLastScheduledSingleWorkflow(); |
| } |
| |
| Set<TaskState> allowedStates = new HashSet<>(Arrays.asList(states)); |
| // Wait for state |
| long st = System.currentTimeMillis(); |
| do { |
| Thread.sleep(timeToSleep); |
| ctx = getWorkflowContext(workflowName); |
| } while ((ctx == null || ctx.getJobState(jobName) == null |
| || !allowedStates.contains(ctx.getJobState(jobName))) |
| && System.currentTimeMillis() < st + timeout); |
| |
| if (ctx == null || !allowedStates.contains(ctx.getJobState(jobName))) { |
| WorkflowConfig wfcfg = getWorkflowConfig(workflowName); |
| JobConfig jobConfig = getJobConfig(jobName); |
| JobContext jbCtx = getJobContext(jobName); |
| throw new HelixException(String.format( |
| "Workflow %s context is null or job %s is not in states: %s; ctx is %s, jobState is %s, wf cfg %s, jobcfg %s, jbctx %s", |
| workflowName, jobName, allowedStates, ctx == null ? "null" : ctx, |
| ctx != null ? ctx.getJobState(jobName) : "null", wfcfg, jobConfig, jbCtx)); |
| |
| } |
| |
| return ctx.getJobState(jobName); |
| } |
| |
| /** |
| * This is a wrapper function for monitoring job state with default timeout 2 MINUTES. |
| * If timeout happens, then it will throw a HelixException, Otherwise, it will return |
| * current job state |
| * @param workflowName The workflow that contains the job to monitor |
| * @param jobName The specified job to monitor |
| * @param states Specified states that user would like to stop monitoring |
| * @return A TaskState, which is current job state |
| * @throws Exception |
| */ |
| public TaskState pollForJobState(String workflowName, String jobName, TaskState... states) |
| throws InterruptedException { |
| return pollForJobState(workflowName, jobName, DEFAULT_TIMEOUT, states); |
| } |
| |
| /** |
| * This function returns the timestamp of the very last task that was scheduled. It is provided to |
| * help determine |
| * whether a given Workflow/Job/Task is stuck. |
| * @param workflowName The name of the workflow |
| * @return timestamp of the most recent job scheduled. |
| * -1L if timestamp is not set (either nothing is scheduled or no start time recorded). |
| */ |
| public long getLastScheduledTaskTimestamp(String workflowName) { |
| return getLastScheduledTaskExecutionInfo(workflowName).getStartTimeStamp(); |
| } |
| |
| public TaskExecutionInfo getLastScheduledTaskExecutionInfo(String workflowName) { |
| long lastScheduledTaskTimestamp = TaskExecutionInfo.TIMESTAMP_NOT_SET; |
| String jobName = null; |
| Integer taskPartitionIndex = null; |
| TaskPartitionState state = null; |
| |
| WorkflowContext workflowContext = getWorkflowContext(workflowName); |
| if (workflowContext != null) { |
| Map<String, TaskState> allJobStates = workflowContext.getJobStates(); |
| for (Map.Entry<String, TaskState> jobState : allJobStates.entrySet()) { |
| if (!jobState.getValue().equals(TaskState.NOT_STARTED)) { |
| JobContext jobContext = getJobContext(jobState.getKey()); |
| if (jobContext != null) { |
| Set<Integer> allPartitions = jobContext.getPartitionSet(); |
| for (Integer partition : allPartitions) { |
| String startTime = jobContext.getMapField(partition).get(TASK_START_TIME_KEY); |
| if (startTime != null) { |
| long startTimeLong = Long.parseLong(startTime); |
| if (startTimeLong > lastScheduledTaskTimestamp) { |
| lastScheduledTaskTimestamp = startTimeLong; |
| jobName = jobState.getKey(); |
| taskPartitionIndex = partition; |
| state = jobContext.getPartitionState(partition); |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| return new TaskExecutionInfo(jobName, taskPartitionIndex, state, lastScheduledTaskTimestamp); |
| } |
| |
| /** |
| * Returns the lookup of UserContentStore by key. |
| * @param key key used at write time by a task implementing UserContentStore |
| * @param scope scope used at write time |
| * @param workflowName name of workflow. Must be supplied |
| * @param jobName name of job. Optional if scope is WORKFLOW |
| * @param taskName name of task. Optional if scope is WORKFLOW or JOB |
| * @return null if key-value pair not found or this content store does not exist. Otherwise, |
| * return a String |
| * @deprecated use the following equivalents: {@link #getWorkflowUserContentMap(String)}, |
| * {@link #getJobUserContentMap(String, String)}, |
| * @{{@link #getTaskContentMap(String, String, String)}} |
| */ |
| @Deprecated |
| public String getUserContent(String key, UserContentStore.Scope scope, String workflowName, |
| String jobName, String taskName) { |
| return TaskUtil.getUserContent(_propertyStore, key, scope, workflowName, jobName, taskName); |
| } |
| |
| /** |
| * Return the full user content map for workflow |
| * @param workflowName workflow name |
| * @return user content map |
| */ |
| public Map<String, String> getWorkflowUserContentMap(String workflowName) { |
| return TaskUtil.getWorkflowJobUserContentMap(_propertyStore, workflowName); |
| } |
| |
| /** |
| * Return full user content map for job |
| * @param workflowName workflow name |
| * @param jobName Un-namespaced job name |
| * @return user content map |
| */ |
| public Map<String, String> getJobUserContentMap(String workflowName, String jobName) { |
| String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName, jobName); |
| return TaskUtil.getWorkflowJobUserContentMap(_propertyStore, namespacedJobName); |
| } |
| |
| /** |
| * Return full user content map for task |
| * @param workflowName workflow name |
| * @param jobName Un-namespaced job name |
| * @param taskPartitionId task partition id |
| * @return user content map |
| */ |
| public Map<String, String> getTaskUserContentMap(String workflowName, String jobName, |
| String taskPartitionId) { |
| String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName, jobName); |
| String namespacedTaskName = TaskUtil.getNamespacedTaskName(namespacedJobName, taskPartitionId); |
| return TaskUtil.getTaskUserContentMap(_propertyStore, namespacedJobName, namespacedTaskName); |
| } |
| |
| /** |
| * Add or update workflow user content with the given map - new keys will be added, and old |
| * keys will be updated |
| * @param workflowName workflow name |
| * @param contentToAddOrUpdate map containing items to add or update |
| */ |
| public void addOrUpdateWorkflowUserContentMap(String workflowName, |
| final Map<String, String> contentToAddOrUpdate) { |
| TaskUtil.addOrUpdateWorkflowJobUserContentMap(_propertyStore, workflowName, |
| contentToAddOrUpdate); |
| } |
| |
| /** |
| * Add or update job user content with the given map - new keys will be added, and old keys will |
| * be updated |
| * @param workflowName workflow name |
| * @param jobName Un-namespaced job name |
| * @param contentToAddOrUpdate map containing items to add or update |
| */ |
| public void addOrUpdateJobUserContentMap(String workflowName, String jobName, |
| final Map<String, String> contentToAddOrUpdate) { |
| String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName, jobName); |
| TaskUtil.addOrUpdateWorkflowJobUserContentMap(_propertyStore, namespacedJobName, |
| contentToAddOrUpdate); |
| } |
| |
| /** |
| * Add or update task user content with the given map - new keys will be added, and old keys |
| * will be updated |
| * @param workflowName workflow name |
| * @param jobName Un-namespaced job name |
| * @param taskPartitionId task partition id |
| * @param contentToAddOrUpdate map containing items to add or update |
| */ |
| public void addOrUpdateTaskUserContentMap(String workflowName, String jobName, |
| String taskPartitionId, final Map<String, String> contentToAddOrUpdate) { |
| String namespacedJobName = TaskUtil.getNamespacedJobName(workflowName, jobName); |
| String namespacedTaskName = TaskUtil.getNamespacedTaskName(namespacedJobName, taskPartitionId); |
| TaskUtil.addOrUpdateTaskUserContentMap(_propertyStore, namespacedJobName, namespacedTaskName, |
| contentToAddOrUpdate); |
| } |
| |
| /** |
| * Throw Exception if children nodes will exceed limitation after adding newNodesCount children. |
| * @param newConfigNodeCount |
| */ |
| private void validateZKNodeLimitation(int newConfigNodeCount) { |
| List<String> resourceConfigs = |
| _accessor.getChildNames(_accessor.keyBuilder().resourceConfigs()); |
| if (resourceConfigs.size() + newConfigNodeCount > _configsLimitation) { |
| throw new HelixException( |
| "Cannot create more workflows or jobs because there are already too many items created in the path CONFIGS."); |
| } |
| } |
| |
| /** |
| * Get the target task thread pool size of an instance, a value that's used to construct the task |
| * thread pool and is created by users. |
| * @param instanceName - name of the instance |
| * @return the target task thread pool size of the instance |
| */ |
| public int getTargetTaskThreadPoolSize(String instanceName) { |
| InstanceConfig instanceConfig = getInstanceConfig(instanceName); |
| return instanceConfig.getTargetTaskThreadPoolSize(); |
| } |
| |
| /** |
| * Set the target task thread pool size of an instance. The target task thread pool size goes to |
| * InstanceConfig, and is used to construct the task thread pool. The newly-set target task |
| * thread pool size will take effect upon a JVM restart. |
| * @param instanceName - name of the instance |
| * @param targetTaskThreadPoolSize - the target task thread pool size of the instance |
| */ |
| public void setTargetTaskThreadPoolSize(String instanceName, int targetTaskThreadPoolSize) { |
| InstanceConfig instanceConfig = getInstanceConfig(instanceName); |
| instanceConfig.setTargetTaskThreadPoolSize(targetTaskThreadPoolSize); |
| _accessor.setProperty(_accessor.keyBuilder().instanceConfig(instanceName), instanceConfig); |
| } |
| |
| private InstanceConfig getInstanceConfig(String instanceName) { |
| InstanceConfig instanceConfig = |
| _accessor.getProperty(_accessor.keyBuilder().instanceConfig(instanceName)); |
| if (instanceConfig == null) { |
| throw new IllegalArgumentException( |
| "Failed to find InstanceConfig with provided instance name " + instanceName + "!"); |
| } |
| return instanceConfig; |
| } |
| |
| /** |
| * Get the global target task thread pool size of the cluster, a value that's used to construct |
| * task thread pools for the cluster's instances and is created by users. |
| * @return the global target task thread pool size of the cluster |
| */ |
| public int getGlobalTargetTaskThreadPoolSize() { |
| ClusterConfig clusterConfig = getClusterConfig(); |
| return clusterConfig.getGlobalTargetTaskThreadPoolSize(); |
| } |
| |
| /** |
| * Set the global target task thread pool size of the cluster. The global target task thread pool |
| * size goes to ClusterConfig, and is applied to all instances of the cluster. If an instance |
| * doesn't specify its target thread pool size in InstanceConfig, then this value in ClusterConfig |
| * will be used to construct its task thread pool. The newly-set target task thread pool size will |
| * take effect upon a JVM restart. If none of the global and per-instance target thread pool sizes |
| * are set, a default size will be used. |
| * @param globalTargetTaskThreadPoolSize - the global target task thread pool size of the cluster |
| */ |
| public void setGlobalTargetTaskThreadPoolSize(int globalTargetTaskThreadPoolSize) { |
| ClusterConfig clusterConfig = getClusterConfig(); |
| clusterConfig.setGlobalTargetTaskThreadPoolSize(globalTargetTaskThreadPoolSize); |
| _accessor.setProperty(_accessor.keyBuilder().clusterConfig(), clusterConfig); |
| } |
| |
| private ClusterConfig getClusterConfig() { |
| ClusterConfig clusterConfig = _accessor.getProperty(_accessor.keyBuilder().clusterConfig()); |
| if (clusterConfig == null) { |
| throw new IllegalStateException( |
| "Failed to find ClusterConfig for cluster " + _clusterName + "!"); |
| } |
| return clusterConfig; |
| } |
| |
| /** |
| * Get the current target task thread pool size of an instance. This value reflects the current |
| * task thread pool size that's already created on the instance, and may be different from the |
| * target thread pool size. |
| * @param instanceName - name of the instance |
| * @return the current task thread pool size of the instance |
| */ |
| public int getCurrentTaskThreadPoolSize(String instanceName) { |
| LiveInstance liveInstance = |
| _accessor.getProperty(_accessor.keyBuilder().liveInstance(instanceName)); |
| if (liveInstance == null) { |
| throw new IllegalArgumentException( |
| "Failed to find LiveInstance with provided instance name " + instanceName + "!"); |
| } |
| |
| return liveInstance.getCurrentTaskThreadPoolSize(); |
| } |
| } |