| package org.apache.helix.task; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.io.IOException; |
| import java.security.InvalidParameterException; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.Stack; |
| |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.base.Joiner; |
| import com.google.common.collect.Sets; |
| import org.apache.helix.AccessOption; |
| import org.apache.helix.ConfigAccessor; |
| import org.apache.helix.HelixDataAccessor; |
| import org.apache.helix.HelixException; |
| import org.apache.helix.HelixManager; |
| import org.apache.helix.HelixProperty; |
| import org.apache.helix.InstanceType; |
| import org.apache.helix.PropertyKey; |
| import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider; |
| import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; |
| import org.apache.helix.manager.zk.ZKUtil; |
| import org.apache.helix.model.ClusterConfig; |
| import org.apache.helix.model.HelixConfigScope; |
| import org.apache.helix.model.InstanceConfig; |
| import org.apache.helix.model.ResourceConfig; |
| import org.apache.helix.model.builder.HelixConfigScopeBuilder; |
| import org.apache.helix.store.HelixPropertyStore; |
| import org.apache.helix.util.RebalanceUtil; |
| import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; |
| import org.apache.helix.zookeeper.datamodel.ZNRecord; |
| import org.apache.helix.zookeeper.zkclient.DataUpdater; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Static utility methods. |
| */ |
| public class TaskUtil { |
| private static final Logger LOG = LoggerFactory.getLogger(TaskUtil.class); |
| public static final String CONTEXT_NODE = "Context"; |
| public static final String USER_CONTENT_NODE = "UserContent"; |
| public static final String WORKFLOW_CONTEXT_KW = "WorkflowContext"; |
| public static final String TASK_CONTEXT_KW = "TaskContext"; |
| |
| /** |
| * Parses job resource configurations in Helix into a {@link JobConfig} object. |
| * This method is internal API, please use the corresponding one in TaskDriver.getJobConfig(); |
| * @param accessor Accessor to access Helix configs |
| * @param job The name of the job resource |
| * @return A {@link JobConfig} object if Helix contains valid configurations for the job, null |
| * otherwise. |
| */ |
| protected static JobConfig getJobConfig(HelixDataAccessor accessor, String job) { |
| HelixProperty jobResourceConfig = getResourceConfig(accessor, job); |
| if (jobResourceConfig == null) { |
| return null; |
| } |
| return new JobConfig(jobResourceConfig); |
| } |
| |
| /** |
| * Parses job resource configurations in Helix into a {@link JobConfig} object. |
| * This method is internal API, please use the corresponding one in TaskDriver.getJobConfig(); |
| * @param manager HelixManager object used to connect to Helix. |
| * @param job The name of the job resource. |
| * @return A {@link JobConfig} object if Helix contains valid configurations for the job, null |
| * otherwise. |
| */ |
| protected static JobConfig getJobConfig(HelixManager manager, String job) { |
| return getJobConfig(manager.getHelixDataAccessor(), job); |
| } |
| |
| /** |
| * Creates a job config. Returns false if the job of the same name already exists. |
| * @param accessor Accessor to Helix configs |
| * @param job The job name |
| * @param jobConfig The job config to be set |
| * @return True if set successfully, otherwise false |
| */ |
| protected static boolean createJobConfig(HelixDataAccessor accessor, String job, |
| JobConfig jobConfig) { |
| return createResourceConfig(accessor, job, jobConfig); |
| } |
| |
| /** |
| * Remove a job config. |
| * @param accessor |
| * @param job |
| * @return |
| */ |
| protected static boolean removeJobConfig(HelixDataAccessor accessor, String job) { |
| return removeWorkflowJobConfig(accessor, job); |
| } |
| |
| /** |
| * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object. |
| * This method is internal API, please use the corresponding one in |
| * TaskDriver.getWorkflowConfig(); |
| * @param accessor Accessor to access Helix configs |
| * @param workflow The name of the workflow. |
| * @return A {@link WorkflowConfig} object if Helix contains valid configurations for the |
| * workflow, null otherwise. |
| */ |
| protected static WorkflowConfig getWorkflowConfig(HelixDataAccessor accessor, String workflow) { |
| HelixProperty workflowCfg = getResourceConfig(accessor, workflow); |
| if (workflowCfg == null) { |
| return null; |
| } |
| |
| return new WorkflowConfig(workflowCfg); |
| } |
| |
| /** |
| * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object. |
| * This method is internal API, please use the corresponding one in |
| * TaskDriver.getWorkflowConfig(); |
| * @param manager Helix manager object used to connect to Helix. |
| * @param workflow The name of the workflow resource. |
| * @return A {@link WorkflowConfig} object if Helix contains valid configurations for the |
| * workflow, null otherwise. |
| */ |
| protected static WorkflowConfig getWorkflowConfig(HelixManager manager, String workflow) { |
| return getWorkflowConfig(manager.getHelixDataAccessor(), workflow); |
| } |
| |
| /** |
| * Create the workflow config. Fails if the ZNode already exists. |
| * @param accessor |
| * @param workflow |
| * @param workflowConfig |
| * @return |
| */ |
| protected static boolean createWorkflowConfig(HelixDataAccessor accessor, String workflow, |
| WorkflowConfig workflowConfig) { |
| return createResourceConfig(accessor, workflow, workflowConfig); |
| } |
| |
| /** |
| * Set the workflow config |
| * @param accessor Accessor to Helix configs |
| * @param workflow The workflow name |
| * @param workflowConfig The workflow config to be set |
| * @return True if set successfully, otherwise false |
| */ |
| protected static boolean setWorkflowConfig(HelixDataAccessor accessor, String workflow, |
| WorkflowConfig workflowConfig) { |
| return setResourceConfig(accessor, workflow, workflowConfig); |
| } |
| |
| /** |
| * Remove a workflow config. |
| * @param accessor |
| * @param workflow |
| * @return |
| */ |
| protected static boolean removeWorkflowConfig(HelixDataAccessor accessor, String workflow) { |
| return removeWorkflowJobConfig(accessor, workflow); |
| } |
| |
| /** |
| * Get a Helix configuration scope at a resource (i.e. job and workflow) level |
| * @param clusterName the cluster containing the resource |
| * @param resource the resource name |
| * @return instantiated {@link HelixConfigScope} |
| */ |
| protected static HelixConfigScope getResourceConfigScope(String clusterName, String resource) { |
| return new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.RESOURCE) |
| .forCluster(clusterName).forResource(resource).build(); |
| } |
| |
| /** |
| * Get the runtime context of a single job. |
| * This method is internal API, please use TaskDriver.getJobContext(); |
| * @param propertyStore Property store for the cluster |
| * @param jobResource The name of the job |
| * @return the {@link JobContext}, or null if none is available |
| */ |
| protected static JobContext getJobContext(HelixPropertyStore<ZNRecord> propertyStore, |
| String jobResource) { |
| if (jobResource == null) { |
| throw new InvalidParameterException("Null job name is now allowed"); |
| } |
| ZNRecord r = propertyStore.get( |
| Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE), null, |
| AccessOption.PERSISTENT); |
| return r != null ? new JobContext(r) : null; |
| } |
| |
| /** |
| * Get the runtime context of a single job. |
| * This method is internal API, please use TaskDriver.getJobContext(); |
| * @param manager a connection to Helix |
| * @param jobResource the name of the job |
| * @return the {@link JobContext}, or null if none is available |
| */ |
| protected static JobContext getJobContext(HelixManager manager, String jobResource) { |
| return getJobContext(manager.getHelixPropertyStore(), jobResource); |
| } |
| |
| /** |
| * Set the runtime context of a single job |
| * This method is internal API; |
| * @param manager a connection to Helix |
| * @param jobResource the name of the job |
| * @param ctx the up-to-date {@link JobContext} for the job |
| */ |
| protected static void setJobContext(HelixManager manager, String jobResource, JobContext ctx) { |
| manager.getHelixPropertyStore().set( |
| Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE), |
| ctx.getRecord(), AccessOption.PERSISTENT); |
| } |
| |
| /** |
| * Remove the runtime context of a single job. |
| * This method is internal API. |
| * @param manager A connection to Helix |
| * @param jobResource The name of the job |
| * @return True if remove success, otherwise false |
| */ |
| protected static boolean removeJobContext(HelixManager manager, String jobResource) { |
| return removeJobContext(manager.getHelixPropertyStore(), jobResource); |
| } |
| |
| /** |
| * Remove the runtime context of a single job. |
| * This method is internal API. |
| * @param propertyStore Property store for the cluster |
| * @param job The name of the job |
| * @return True if remove success, otherwise false |
| */ |
| protected static boolean removeJobContext(HelixPropertyStore<ZNRecord> propertyStore, |
| String job) { |
| return removeWorkflowJobContext(propertyStore, job); |
| } |
| |
| /** |
| * Get the runtime context of a single workflow. |
| * This method is internal API, please use the corresponding one in |
| * TaskDriver.getWorkflowContext(); |
| * @param propertyStore Property store of the cluster |
| * @param workflow The name of the workflow |
| * @return the {@link WorkflowContext}, or null if none is available |
| */ |
| protected static WorkflowContext getWorkflowContext(HelixPropertyStore<ZNRecord> propertyStore, |
| String workflow) { |
| ZNRecord r = propertyStore.get( |
| Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow, CONTEXT_NODE), null, |
| AccessOption.PERSISTENT); |
| return r != null ? new WorkflowContext(r) : null; |
| } |
| |
| /** |
| * Get the runtime context of a single workflow. |
| * This method is internal API, please use the corresponding one in |
| * TaskDriver.getWorkflowContext(); |
| * @param manager a connection to Helix |
| * @param workflow the name of the workflow |
| * @return the {@link WorkflowContext}, or null if none is available |
| */ |
| protected static WorkflowContext getWorkflowContext(HelixManager manager, String workflow) { |
| return getWorkflowContext(manager.getHelixPropertyStore(), workflow); |
| } |
| |
| /** |
| * Set the runtime context of a single workflow |
| * @param manager a connection to Helix |
| * @param workflow the name of the workflow |
| * @param workflowContext the up-to-date {@link WorkflowContext} for the workflow |
| */ |
| protected static void setWorkflowContext(HelixManager manager, String workflow, |
| WorkflowContext workflowContext) { |
| manager.getHelixPropertyStore().set( |
| Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow, CONTEXT_NODE), |
| workflowContext.getRecord(), AccessOption.PERSISTENT); |
| } |
| |
| /** |
| * Remove the runtime context of a single workflow. |
| * This method is internal API. |
| * @param manager A connection to Helix |
| * @param workflow The name of the workflow |
| * @return True if remove success, otherwise false |
| */ |
| protected static boolean removeWorkflowContext(HelixManager manager, String workflow) { |
| return removeWorkflowContext(manager.getHelixPropertyStore(), workflow); |
| } |
| |
| /** |
| * Remove the runtime context of a single workflow. |
| * This method is internal API. |
| * @param propertyStore Property store for the cluster |
| * @param workflow The name of the workflow |
| * @return True if remove success, otherwise false |
| */ |
| protected static boolean removeWorkflowContext(HelixPropertyStore<ZNRecord> propertyStore, |
| String workflow) { |
| return removeWorkflowJobContext(propertyStore, workflow); |
| } |
| |
| /** |
| * Intialize the user content store znode setup |
| * @param propertyStore zookeeper property store |
| * @param workflowJobResource the name of workflow or job |
| * @param record the initial data |
| */ |
| protected static void createUserContent(HelixPropertyStore<ZNRecord> propertyStore, |
| String workflowJobResource, ZNRecord record) { |
| propertyStore.create(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, |
| workflowJobResource, TaskUtil.USER_CONTENT_NODE), record, AccessOption.PERSISTENT); |
| } |
| |
| /** |
| * Get user-defined workflow/job scope key-value pair data. This method takes |
| * HelixPropertyStore<ZNRecord>. |
| * @param propertyStore |
| * @param workflowJobResource |
| * @param key |
| * @return null if there is no such pair, otherwise return a String |
| */ |
| protected static String getWorkflowJobUserContent(HelixPropertyStore<ZNRecord> propertyStore, |
| String workflowJobResource, String key) { |
| Map<String, String> userContentMap = |
| getWorkflowJobUserContentMap(propertyStore, workflowJobResource); |
| return userContentMap != null ? userContentMap.get(key) : null; |
| } |
| |
| /** |
| * get workflow/job user content map |
| * @param propertyStore property store |
| * @param workflowJobResource workflow name or namespaced job name |
| * @return user content map |
| */ |
| protected static Map<String, String> getWorkflowJobUserContentMap( |
| HelixPropertyStore<ZNRecord> propertyStore, String workflowJobResource) { |
| ZNRecord record = propertyStore.get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, |
| workflowJobResource, USER_CONTENT_NODE), null, AccessOption.PERSISTENT); |
| return record != null ? record.getSimpleFields() : null; |
| } |
| |
| /** |
| * Add an user defined key-value pair data to workflow or job level |
| * @param manager a connection to Helix |
| * @param workflowJobResource the name of workflow or job |
| * @param key the key of key-value pair |
| * @param value the value of key-value pair |
| */ |
| protected static void addWorkflowJobUserContent(final HelixManager manager, |
| String workflowJobResource, final String key, final String value) { |
| addOrUpdateWorkflowJobUserContentMap(manager.getHelixPropertyStore(), workflowJobResource, |
| Collections.singletonMap(key, value)); |
| } |
| |
| /* package */ |
| static void addOrUpdateWorkflowJobUserContentMap(final HelixPropertyStore<ZNRecord> propertyStore, |
| String workflowJobResource, final Map<String, String> contentToAddOrUpdate) { |
| if (workflowJobResource == null) { |
| throw new IllegalArgumentException( |
| "workflowJobResource must be not null when adding workflow / job user content"); |
| } |
| String path = Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, |
| USER_CONTENT_NODE); |
| |
| if (!propertyStore.update(path, new DataUpdater<ZNRecord>() { |
| @Override |
| public ZNRecord update(ZNRecord znRecord) { |
| if (znRecord == null) { |
| // This indicates that somehow the UserContentStore ZNode is missing |
| // This should not happen, but if it is missing, create one |
| znRecord = new ZNRecord(new ZNRecord(TaskUtil.USER_CONTENT_NODE)); |
| } |
| znRecord.getSimpleFields().putAll(contentToAddOrUpdate); |
| return znRecord; |
| } |
| }, AccessOption.PERSISTENT)) { |
| LOG.error("Failed to update the UserContentStore for {}", workflowJobResource); |
| } |
| } |
| |
| /** |
| * Get user defined task level key-value pair data |
| * @param propertyStore |
| * @param job the name of job |
| * @param task the name of the task |
| * @param key the key of key-value pair |
| * @return null if there is no such pair, otherwise return a String |
| */ |
| protected static String getTaskUserContent(HelixPropertyStore<ZNRecord> propertyStore, String job, |
| String task, String key) { |
| Map<String, String> userContentStore = getTaskUserContentMap(propertyStore, job, task); |
| return userContentStore != null ? userContentStore.get(key) : null; |
| } |
| |
| /** |
| * Return full task user content map |
| * @param propertyStore property store |
| * @param namespacedJobName namespaced job name |
| * @param taskPartitionId task partition id |
| * @return |
| */ |
| protected static Map<String, String> getTaskUserContentMap( |
| HelixPropertyStore<ZNRecord> propertyStore, String namespacedJobName, |
| String taskPartitionId) { |
| ZNRecord record = propertyStore.get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, |
| namespacedJobName, USER_CONTENT_NODE), null, AccessOption.PERSISTENT); |
| return record != null ? record.getMapField(taskPartitionId) : null; |
| } |
| |
| /** |
| * Add an user defined key-value pair data to task level |
| * @param manager a connection to Helix |
| * @param job the name of job |
| * @param task the name of task |
| * @param key the key of key-value pair |
| * @param value the value of key-value pair |
| */ |
| protected static void addTaskUserContent(final HelixManager manager, String job, |
| final String task, final String key, final String value) { |
| addOrUpdateTaskUserContentMap(manager.getHelixPropertyStore(), job, task, |
| Collections.singletonMap(key, value)); |
| } |
| |
| /* package */ |
| static void addOrUpdateTaskUserContentMap(final HelixPropertyStore<ZNRecord> propertyStore, |
| final String job, final String task, final Map<String, String> contentToAddOrUpdate) { |
| if (job == null || task == null) { |
| throw new IllegalArgumentException( |
| "job and task must be not null when adding task user content"); |
| } |
| String path = |
| Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, job, USER_CONTENT_NODE); |
| |
| if (!propertyStore.update(path, new DataUpdater<ZNRecord>() { |
| @Override |
| public ZNRecord update(ZNRecord znRecord) { |
| if (znRecord == null) { |
| // This indicates that somehow the UserContentStore ZNode is missing |
| // This should not happen, but if it is missing, create one |
| znRecord = new ZNRecord(new ZNRecord(TaskUtil.USER_CONTENT_NODE)); |
| } |
| if (znRecord.getMapField(task) == null) { |
| znRecord.setMapField(task, new HashMap<String, String>()); |
| } |
| znRecord.getMapField(task).putAll(contentToAddOrUpdate); |
| return znRecord; |
| } |
| }, AccessOption.PERSISTENT)) { |
| LOG.error("Failed to update the task UserContentStore for task {} in job {}", task, job); |
| } |
| } |
| |
| /** |
| * Helper method for looking up UserContentStore content. |
| * @param propertyStore |
| * @param key |
| * @param scope |
| * @param workflowName |
| * @param jobName |
| * @param taskName |
| * @return value corresponding to the key |
| */ |
| protected static String getUserContent(HelixPropertyStore propertyStore, String key, |
| UserContentStore.Scope scope, String workflowName, String jobName, String taskName) { |
| switch (scope) { |
| case WORKFLOW: |
| return TaskUtil.getWorkflowJobUserContent(propertyStore, workflowName, key); |
| case JOB: |
| return TaskUtil.getWorkflowJobUserContent(propertyStore, jobName, key); |
| case TASK: |
| return TaskUtil.getTaskUserContent(propertyStore, jobName, taskName, key); |
| default: |
| throw new HelixException("Invalid scope : " + scope.name()); |
| } |
| } |
| |
| /** |
| * Get a workflow-qualified job name for a single-job workflow |
| * @param singleJobWorkflow the name of the single-job workflow |
| * @return The namespaced job name, which is just singleJobWorkflow_singleJobWorkflow |
| */ |
| public static String getNamespacedJobName(String singleJobWorkflow) { |
| return getNamespacedJobName(singleJobWorkflow, singleJobWorkflow); |
| } |
| |
| /** |
| * Get a workflow-qualified job name for a job in that workflow |
| * @param workflow the name of the workflow |
| * @param jobName the un-namespaced name of the job |
| * @return The namespaced job name, which is just workflowResource_jobName |
| */ |
| public static String getNamespacedJobName(String workflow, String jobName) { |
| return workflow + "_" + jobName; |
| } |
| |
| /** |
| * get a task name, namespaced by it's job and workflow |
| * @param namespacedJobName namespaced job name |
| * @param taskPartitionId task partition id |
| * @return |
| */ |
| public static String getNamespacedTaskName(String namespacedJobName, String taskPartitionId) { |
| return String.format("%s_%s", namespacedJobName, taskPartitionId); |
| } |
| |
| /** |
| * Remove the workflow namespace from the job name |
| * @param workflow the name of the workflow that owns the job |
| * @param jobName the namespaced job name |
| * @return the denamespaced job name, or the same job name if it is already denamespaced |
| */ |
| public static String getDenamespacedJobName(String workflow, String jobName) { |
| if (jobName.contains(workflow)) { |
| // skip the entire length of the work plus the underscore |
| return jobName.substring(jobName.indexOf(workflow) + workflow.length() + 1); |
| } else { |
| return jobName; |
| } |
| } |
| |
| /** |
| * Serialize a map of job-level configurations as a single string |
| * @param commandConfig map of job config key to config value |
| * @return serialized string |
| */ |
| // TODO: move this to the JobConfig |
| @Deprecated |
| public static String serializeJobCommandConfigMap(Map<String, String> commandConfig) { |
| ObjectMapper mapper = new ObjectMapper(); |
| try { |
| String serializedMap = mapper.writeValueAsString(commandConfig); |
| return serializedMap; |
| } catch (IOException e) { |
| LOG.error("Error serializing " + commandConfig, e); |
| } |
| return null; |
| } |
| |
| /** |
| * Deserialize a single string into a map of job-level configurations |
| * @param commandConfig the serialized job config map |
| * @return a map of job config key to config value |
| */ |
| // TODO: move this to the JobConfig |
| @Deprecated |
| public static Map<String, String> deserializeJobCommandConfigMap(String commandConfig) { |
| ObjectMapper mapper = new ObjectMapper(); |
| try { |
| Map<String, String> commandConfigMap = |
| mapper.readValue(commandConfig, new TypeReference<HashMap<String, String>>() { |
| }); |
| return commandConfigMap; |
| } catch (IOException e) { |
| LOG.error("Error deserializing " + commandConfig, e); |
| } |
| return Collections.emptyMap(); |
| } |
| |
| /** |
| * Extracts the partition id from the given partition name. |
| * @param pName |
| * @return |
| */ |
| public static int getPartitionId(String pName) { |
| int index = pName.lastIndexOf("_"); |
| if (index == -1) { |
| throw new HelixException(String.format("Invalid partition name %s", pName)); |
| } |
| return Integer.valueOf(pName.substring(index + 1)); |
| } |
| |
| @Deprecated |
| public static String getWorkflowContextKey(String workflow) { |
| // TODO: fix this to use the keyBuilder. |
| return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow); |
| } |
| |
| @Deprecated |
| public static PropertyKey getWorkflowConfigKey(final HelixDataAccessor accessor, |
| String workflow) { |
| return accessor.keyBuilder().resourceConfig(workflow); |
| } |
| |
| /** |
| * TODO: Task Framework no longer uses IdealState; this is left in for backward compability |
| * Cleans up IdealState and external view associated with a job. |
| * @param accessor |
| * @param job |
| * @return True if remove success, otherwise false |
| */ |
| @Deprecated |
| protected static boolean cleanupJobIdealStateExtView(final HelixDataAccessor accessor, |
| String job) { |
| return cleanupIdealStateExtView(accessor, job); |
| } |
| |
| /** |
| * TODO: Task Framework no longer uses IdealState; this is left in for backward compability |
| * Cleans up IdealState and external view associated with a workflow. |
| * @param accessor |
| * @param workflow |
| * @return True if remove success, otherwise false |
| */ |
| @Deprecated |
| protected static boolean cleanupWorkflowIdealStateExtView(final HelixDataAccessor accessor, |
| String workflow) { |
| return cleanupIdealStateExtView(accessor, workflow); |
| } |
| |
| /** |
| * TODO: Task Framework no longer uses IdealState; this is left in for backward compability |
| * Cleans up IdealState and external view associated with a job/workflow resource. |
| */ |
| @Deprecated |
| private static boolean cleanupIdealStateExtView(final HelixDataAccessor accessor, |
| String workflowJobResource) { |
| boolean success = true; |
| PropertyKey isKey = accessor.keyBuilder().idealStates(workflowJobResource); |
| if (accessor.getPropertyStat(isKey) != null) { |
| if (!accessor.removeProperty(isKey)) { |
| LOG.warn(String.format( |
| "Error occurred while trying to remove IdealState for %s. Failed to remove node %s.", |
| workflowJobResource, isKey)); |
| success = false; |
| } |
| } |
| |
| // Delete external view |
| PropertyKey evKey = accessor.keyBuilder().externalView(workflowJobResource); |
| if (accessor.getPropertyStat(evKey) != null) { |
| if (!accessor.removeProperty(evKey)) { |
| LOG.warn(String.format( |
| "Error occurred while trying to remove ExternalView of resource %s. Failed to remove node %s.", |
| workflowJobResource, evKey)); |
| success = false; |
| } |
| } |
| |
| return success; |
| } |
| |
| /** |
| * Remove a workflow and all jobs for the workflow. This removes the workflow config, idealstate, |
| * externalview and workflow contexts associated with this workflow, and all jobs information, |
| * including their configs, context, IS and EV. |
| * @param accessor |
| * @param propertyStore |
| * @param workflow the workflow name. |
| * @param jobs all job names in this workflow. |
| * @return True if remove success, otherwise false |
| */ |
| protected static boolean removeWorkflow(final HelixDataAccessor accessor, |
| final HelixPropertyStore<ZNRecord> propertyStore, String workflow, Set<String> jobs) { |
| // clean up all jobs |
| for (String job : jobs) { |
| if (!removeJob(accessor, propertyStore, job)) { |
| return false; |
| } |
| } |
| |
| if (!removeWorkflowConfig(accessor, workflow)) { |
| LOG.warn("Error occurred while trying to remove workflow config for {}.", workflow); |
| return false; |
| } |
| if (!cleanupWorkflowIdealStateExtView(accessor, workflow)) { |
| LOG.warn("Error occurred while trying to remove workflow idealstate/externalview for {}.", |
| workflow); |
| return false; |
| } |
| if (!removeWorkflowContext(propertyStore, workflow)) { |
| LOG.warn("Error occurred while trying to remove workflow context for {}.", workflow); |
| return false; |
| } |
| return true; |
| } |
| |
| /** |
| * Remove a set of jobs from a workflow. This removes the config, context, IS and EV associated |
| * with each individual job, and removes all the jobs from the WorkflowConfig, and job states from |
| * WorkflowContext. |
| * @param dataAccessor |
| * @param propertyStore |
| * @param jobs |
| * @param workflow |
| * @param maintainDependency |
| * @return True if remove success, otherwise false |
| */ |
| protected static boolean removeJobsFromWorkflow(final HelixDataAccessor dataAccessor, |
| final HelixPropertyStore<ZNRecord> propertyStore, final String workflow, |
| final Set<String> jobs, boolean maintainDependency) { |
| boolean success = true; |
| if (!removeJobsFromDag(dataAccessor, workflow, jobs, maintainDependency)) { |
| LOG.warn("Error occurred while trying to remove jobs {} from the workflow {}.", jobs, |
| workflow); |
| success = false; |
| } |
| if (!removeJobsState(propertyStore, workflow, jobs)) { |
| LOG.warn("Error occurred while trying to remove jobs states from workflow {} jobs {}.", |
| workflow, jobs); |
| success = false; |
| } |
| for (String job : jobs) { |
| if (!removeJob(dataAccessor, propertyStore, job)) { |
| success = false; |
| } |
| } |
| |
| return success; |
| } |
| |
| /** |
| * Return all jobs that are COMPLETED and passes its expiry time. |
| * @param dataAccessor |
| * @param propertyStore |
| * @param workflowConfig |
| * @param workflowContext |
| * @return |
| */ |
| protected static Set<String> getExpiredJobs(HelixDataAccessor dataAccessor, |
| HelixPropertyStore<ZNRecord> propertyStore, WorkflowConfig workflowConfig, |
| WorkflowContext workflowContext) { |
| Set<String> expiredJobs = new HashSet<>(); |
| |
| if (workflowContext != null) { |
| Map<String, TaskState> jobStates = workflowContext.getJobStates(); |
| for (String job : workflowConfig.getJobDag().getAllNodes()) { |
| if (expiredJobs.contains(job)) { |
| continue; |
| } |
| JobConfig jobConfig = TaskUtil.getJobConfig(dataAccessor, job); |
| JobContext jobContext = TaskUtil.getJobContext(propertyStore, job); |
| TaskState jobState = jobStates.get(job); |
| if (isJobExpired(job, jobConfig, jobContext, jobState)) { |
| expiredJobs.add(job); |
| |
| // Failed jobs propagation |
| if (jobState == TaskState.FAILED || jobState == TaskState.TIMED_OUT) { |
| Stack<String> childrenJobs = new Stack<>(); |
| workflowConfig.getJobDag().getDirectChildren(job).forEach(childrenJobs::push); |
| while (!childrenJobs.isEmpty()) { |
| String childJob = childrenJobs.pop(); |
| // Failed and without context means it's failed due to parental job failure |
| if (!expiredJobs.contains(childJob) && jobStates.get(childJob) == TaskState.FAILED |
| && TaskUtil.getJobContext(propertyStore, childJob) == null) { |
| expiredJobs.add(childJob); |
| workflowConfig.getJobDag().getDirectChildren(childJob).forEach(childrenJobs::push); |
| } |
| } |
| } |
| } |
| } |
| } |
| return expiredJobs; |
| } |
| |
| /** |
| * Based on a workflow's config or context, create a set of jobs that are either expired, which |
| * means they are COMPLETED and have passed their expiration time, or don't have JobConfigs, |
| * meaning that the job might have been deleted manually from the a job queue, or is left in the |
| * DAG due to a failed clean-up attempt from last purge. The difference between this function and |
| * getExpiredJobs() is that this function gets JobConfig and JobContext from a |
| * WorkflowControllerDataProvider instead of Zk. |
| * @param workflowControllerDataProvider |
| * @param workflowConfig |
| * @param workflowContext |
| * @return |
| */ |
| public static Set<String> getExpiredJobsFromCache( |
| WorkflowControllerDataProvider workflowControllerDataProvider, WorkflowConfig workflowConfig, |
| WorkflowContext workflowContext, HelixManager manager) { |
| Set<String> expiredJobs = new HashSet<>(); |
| Map<String, TaskState> jobStates = workflowContext.getJobStates(); |
| for (String job : workflowConfig.getJobDag().getAllNodes()) { |
| if (expiredJobs.contains(job)) { |
| continue; |
| } |
| JobConfig jobConfig = workflowControllerDataProvider.getJobConfig(job); |
| // TODO: Temporary solution for cache selective update race conditions |
| if (jobConfig == null) { |
| jobConfig = TaskUtil.getJobConfig(manager, job); |
| } |
| |
| JobContext jobContext = workflowControllerDataProvider.getJobContext(job); |
| TaskState jobState = jobStates.get(job); |
| if (isJobExpired(job, jobConfig, jobContext, jobState)) { |
| expiredJobs.add(job); |
| |
| // Failed jobs propagation |
| if (jobState == TaskState.FAILED || jobState == TaskState.TIMED_OUT) { |
| Stack<String> childrenJobs = new Stack<>(); |
| workflowConfig.getJobDag().getDirectChildren(job).forEach(childrenJobs::push); |
| while (!childrenJobs.isEmpty()) { |
| String childJob = childrenJobs.pop(); |
| // Failed and without context means it's failed due to parental job failure |
| if (!expiredJobs.contains(childJob) && jobStates.get(childJob) == TaskState.FAILED |
| && workflowControllerDataProvider.getJobContext(childJob) == null) { |
| expiredJobs.add(childJob); |
| workflowConfig.getJobDag().getDirectChildren(childJob).forEach(childrenJobs::push); |
| } |
| } |
| } |
| } |
| } |
| return expiredJobs; |
| } |
| |
| /* |
| * Checks if a job is expired and should be purged. This includes a special case when jobConfig |
| * is null. That happens when a job might have been deleted manually from the a job queue, or is |
| * left in the DAG due to a failed clean-up attempt from last purge. |
| */ |
| private static boolean isJobExpired(String jobName, JobConfig jobConfig, JobContext jobContext, |
| TaskState jobState) { |
| if (jobConfig == null) { |
| LOG.warn( |
| "Job {} exists in JobDAG but JobConfig is missing! It's treated as expired and will be purged.", |
| jobName); |
| return true; |
| } |
| if (jobContext == null || jobContext.getFinishTime() == WorkflowContext.UNFINISHED) { |
| return false; |
| } |
| long jobFinishTime = jobContext.getFinishTime(); |
| long expiry = jobConfig.getExpiry(); |
| long terminalStateExpiry = jobConfig.getTerminalStateExpiry(); |
| return jobState == TaskState.COMPLETED && System.currentTimeMillis() >= jobFinishTime + expiry |
| || (jobState == TaskState.FAILED || jobState == TaskState.TIMED_OUT) |
| && terminalStateExpiry > 0 |
| && System.currentTimeMillis() >= jobFinishTime + terminalStateExpiry; |
| } |
| |
| /** |
| * Remove Job Config, IS/EV, and Context in order. Job name here must be a namespaced job name. |
| * @param accessor |
| * @param propertyStore |
| * @param job namespaced job name |
| * @return |
| */ |
| protected static boolean removeJob(HelixDataAccessor accessor, |
| HelixPropertyStore<ZNRecord> propertyStore, String job) { |
| if (!removeJobConfig(accessor, job)) { |
| LOG.warn(String.format("Error occurred while trying to remove job config for %s.", job)); |
| return false; |
| } |
| if (!cleanupJobIdealStateExtView(accessor, job)) { |
| LOG.warn( |
| "Error occurred while trying to remove job idealstate/externalview for {}.", job); |
| return false; |
| } |
| if (!removeJobContext(propertyStore, job)) { |
| LOG.warn("Error occurred while trying to remove job context for {}.", job); |
| return false; |
| } |
| return true; |
| } |
| |
| /** Remove the job name from the DAG from the queue configuration */ |
| // Job name should be namespaced job name here. |
| protected static boolean removeJobsFromDag(final HelixDataAccessor accessor, |
| final String workflow, final Set<String> jobsToRemove, final boolean maintainDependency) { |
| // Now atomically clear the DAG |
| DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() { |
| @Override |
| public ZNRecord update(ZNRecord currentData) { |
| if (currentData != null) { |
| JobDag jobDag = JobDag.fromJson( |
| currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name())); |
| if (jobDag == null) { |
| LOG.warn("Could not update DAG for workflow: {} JobDag is null.", workflow); |
| return null; |
| } |
| for (String job : jobsToRemove) { |
| jobDag.removeNode(job, maintainDependency); |
| } |
| try { |
| currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), |
| jobDag.toJson()); |
| } catch (IOException e) { |
| throw new IllegalArgumentException(e); |
| } |
| } |
| return currentData; |
| } |
| }; |
| |
| String configPath = accessor.keyBuilder().resourceConfig(workflow).getPath(); |
| if (!accessor.getBaseDataAccessor().update(configPath, dagRemover, AccessOption.PERSISTENT)) { |
| LOG.warn("Failed to remove jobs {} from DAG of workflow {}", jobsToRemove, workflow); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /** |
| * update workflow's property to remove jobs from JOB_STATES if there are already started. |
| */ |
| protected static boolean removeJobsState(final HelixPropertyStore<ZNRecord> propertyStore, |
| final String workflow, final Set<String> jobs) { |
| String contextPath = |
| Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflow, TaskUtil.CONTEXT_NODE); |
| |
| // If the queue is not started, there is no JobState need to be removed. |
| if (!propertyStore.exists(contextPath, 0)) { |
| return true; |
| } |
| |
| DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() { |
| @Override |
| public ZNRecord update(ZNRecord currentData) { |
| if (currentData != null) { |
| WorkflowContext workflowContext = new WorkflowContext(currentData); |
| workflowContext.removeJobStates(jobs); |
| workflowContext.removeJobStartTime(jobs); |
| currentData = workflowContext.getRecord(); |
| } |
| return currentData; |
| } |
| }; |
| if (!propertyStore.update(contextPath, updater, AccessOption.PERSISTENT)) { |
| LOG.warn("Fail to remove job state for jobs {} from workflow {}", jobs, workflow); |
| return false; |
| } |
| return true; |
| } |
| |
| private static boolean removeWorkflowJobContext(HelixPropertyStore<ZNRecord> propertyStore, |
| String workflowJobResource) { |
| String path = Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource); |
| if (propertyStore.exists(path, AccessOption.PERSISTENT)) { |
| if (!propertyStore.remove(path, AccessOption.PERSISTENT)) { |
| LOG.warn( |
| "Error occurred while trying to remove workflow/jobcontext for {}. Failed to remove node {}.", |
| workflowJobResource, path); |
| return false; |
| } |
| } |
| LOG.info("removed job context {}.", path); |
| return true; |
| } |
| |
| /** |
| * Remove workflow or job config. |
| * @param accessor |
| * @param workflowJobResource the workflow or job name |
| */ |
| private static boolean removeWorkflowJobConfig(HelixDataAccessor accessor, |
| String workflowJobResource) { |
| PropertyKey cfgKey = accessor.keyBuilder().resourceConfig(workflowJobResource); |
| if (accessor.getPropertyStat(cfgKey) != null) { |
| if (!accessor.removeProperty(cfgKey)) { |
| LOG.warn("Error occurred while trying to remove config for {}. Failed to remove node {}.", |
| workflowJobResource, cfgKey); |
| return false; |
| } |
| } |
| LOG.info("removed job config {}.", cfgKey.getPath()); |
| return true; |
| } |
| |
| /** |
| * Create the resource config. Fails if it already exists in ZK. |
| * @param accessor |
| * @param resource |
| * @param resourceConfig |
| * @return |
| */ |
| private static boolean createResourceConfig(HelixDataAccessor accessor, String resource, |
| ResourceConfig resourceConfig) { |
| PropertyKey.Builder keyBuilder = accessor.keyBuilder(); |
| return accessor.getBaseDataAccessor().create(keyBuilder.resourceConfig(resource).getPath(), |
| resourceConfig.getRecord(), AccessOption.PERSISTENT); |
| } |
| |
| /** |
| * Set the resource config |
| * @param accessor Accessor to Helix configs |
| * @param resource The resource name |
| * @param resourceConfig The resource config to be set |
| * @return True if set successfully, otherwise false |
| */ |
| private static boolean setResourceConfig(HelixDataAccessor accessor, String resource, |
| ResourceConfig resourceConfig) { |
| PropertyKey.Builder keyBuilder = accessor.keyBuilder(); |
| return accessor.setProperty(keyBuilder.resourceConfig(resource), resourceConfig); |
| } |
| |
| private static HelixProperty getResourceConfig(HelixDataAccessor accessor, String resource) { |
| PropertyKey.Builder keyBuilder = accessor.keyBuilder(); |
| return accessor.getProperty(keyBuilder.resourceConfig(resource)); |
| } |
| |
| public static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) { |
| Set<Integer> nonReadyPartitions = Sets.newHashSet(); |
| for (int p : ctx.getPartitionSet()) { |
| long toStart = ctx.getNextRetryTime(p); |
| if (now < toStart) { |
| nonReadyPartitions.add(p); |
| } |
| } |
| return nonReadyPartitions; |
| } |
| |
| /** |
| * Returns whether if a given job is a generic job (not a targeted job). |
| * @param jobConfig |
| * @return |
| */ |
| public static boolean isGenericTaskJob(JobConfig jobConfig) { |
| // Targeted jobs may have TaskConfigs, so we check whether the target resource is set |
| return jobConfig.getTargetResource() == null || jobConfig.getTargetResource().equals(""); |
| } |
| |
| /** |
| * Check whether tasks are just started or still running |
| * @param jobContext The job context |
| * @return False if still tasks not in final state. Otherwise return true |
| */ |
| public static boolean checkJobStopped(JobContext jobContext) { |
| for (int partition : jobContext.getPartitionSet()) { |
| TaskPartitionState taskState = jobContext.getPartitionState(partition); |
| if (taskState == TaskPartitionState.RUNNING) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Count the number of jobs in a workflow that are not in final state. |
| * @param workflowCfg |
| * @param workflowCtx |
| * @return |
| */ |
| public static int getInCompleteJobCount(WorkflowConfig workflowCfg, WorkflowContext workflowCtx) { |
| int inCompleteCount = 0; |
| for (String jobName : workflowCfg.getJobDag().getAllNodes()) { |
| TaskState jobState = workflowCtx.getJobState(jobName); |
| if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED |
| || jobState == TaskState.STOPPING) { |
| ++inCompleteCount; |
| } |
| } |
| |
| return inCompleteCount; |
| } |
| |
| public static boolean isJobStarted(String job, WorkflowContext workflowContext) { |
| TaskState jobState = workflowContext.getJobState(job); |
| return (jobState != null && jobState != TaskState.NOT_STARTED); |
| } |
| |
| /** |
| * Clean up all jobs that are marked as expired. |
| */ |
| public static void purgeExpiredJobs(String workflow, Set<String> expiredJobs, |
| HelixManager manager, RebalanceScheduler rebalanceScheduler) { |
| Set<String> failedJobRemovals = new HashSet<>(); |
| for (String job : expiredJobs) { |
| if (!TaskUtil |
| .removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(), job)) { |
| failedJobRemovals.add(job); |
| LOG.warn("Failed to clean up expired and completed jobs from workflow {}!", workflow); |
| } |
| rebalanceScheduler.removeScheduledRebalance(job); |
| } |
| |
| // If the job removal failed, make sure we do NOT prematurely delete it from DAG so that the |
| // removal will be tried again at next purge |
| expiredJobs.removeAll(failedJobRemovals); |
| |
| if (!TaskUtil.removeJobsFromDag(manager.getHelixDataAccessor(), workflow, expiredJobs, true)) { |
| LOG.warn("Error occurred while trying to remove jobs {} from the workflow {}!", expiredJobs, |
| workflow); |
| } |
| |
| if (expiredJobs.size() > 0) { |
| // Update workflow context will be in main pipeline not here. Otherwise, it will cause |
| // concurrent write issue. It is possible that jobs got purged but there is no event to |
| // trigger the pipeline to clean context. |
| HelixDataAccessor accessor = manager.getHelixDataAccessor(); |
| List<String> resourceConfigs = |
| accessor.getChildNames(accessor.keyBuilder().resourceConfigs()); |
| if (resourceConfigs.size() > 0) { |
| RebalanceUtil.scheduleOnDemandPipeline(manager.getClusterName(), 0L); |
| } else { |
| LOG.warn("No resource config to trigger rebalance for clean up contexts for {}!", |
| expiredJobs); |
| } |
| } |
| } |
| |
| /** |
| * The function that removes IdealStates and workflow contexts of the workflows that need to be |
| * deleted. |
| * @param toBePurgedWorkflows |
| * @param manager |
| */ |
| public static void workflowGarbageCollection(final Set<String> toBePurgedWorkflows, |
| final HelixManager manager) { |
| HelixDataAccessor accessor = manager.getHelixDataAccessor(); |
| HelixPropertyStore<ZNRecord> propertyStore = manager.getHelixPropertyStore(); |
| |
| for (String workflowName : toBePurgedWorkflows) { |
| LOG.warn( |
| "WorkflowContext exists for workflow {}. However, Workflow Config is missing! Deleting the WorkflowContext and IdealState!!", |
| workflowName); |
| |
| // TODO: We dont need this in the future when TF is not relying on IS/EV anymore. |
| if (!cleanupWorkflowIdealStateExtView(accessor, workflowName)) { |
| LOG.warn("Error occurred while trying to remove workflow idealstate/externalview for {}.", |
| workflowName); |
| continue; |
| } |
| |
| if (!removeWorkflowContext(propertyStore, workflowName)) { |
| LOG.warn("Error occurred while trying to remove workflow context for {}.", workflowName); |
| } |
| } |
| } |
| |
| /** |
| * The function that removes IdealStates and job contexts of the jobs that need to be |
| * deleted. |
| * Warning: This method should only be used for the jobs that have job context and do not have job |
| * config. |
| * @param jobsWithoutConfig |
| * @param manager |
| */ |
| public static void jobGarbageCollection(final Set<String> jobsWithoutConfig, |
| final HelixManager manager) { |
| for (String jobName : jobsWithoutConfig) { |
| LOG.warn( |
| "JobContext exists for job {}. However, job Config is missing! Deleting the JobContext and IdealState!!", |
| jobName); |
| if (!TaskUtil.removeJob(manager.getHelixDataAccessor(), manager.getHelixPropertyStore(), |
| jobName)) { |
| LOG.warn("Failed to clean up the job {}", jobName); |
| } |
| } |
| } |
| |
| /** |
| * Get target thread pool size from InstanceConfig first; if InstanceConfig doesn't exist or the |
| * value is undefined, try ClusterConfig; if the value is undefined in ClusterConfig, fall back |
| * to the default value. |
| * @param zkClient - ZooKeeper connection for config reading |
| * @param clusterName - the cluster name for InstanceConfig and ClusterConfig |
| * @param instanceName - the instance name for InstanceConfig |
| * @return target thread pool size |
| */ |
| public static int getTargetThreadPoolSize(RealmAwareZkClient zkClient, String clusterName, |
| String instanceName) { |
| ConfigAccessor configAccessor = new ConfigAccessor(zkClient); |
| |
| // Check instance config first for thread pool size |
| if (ZKUtil.isInstanceSetup(zkClient, clusterName, instanceName, InstanceType.PARTICIPANT)) { |
| InstanceConfig instanceConfig = configAccessor.getInstanceConfig(clusterName, instanceName); |
| if (instanceConfig != null) { |
| int targetTaskThreadPoolSize = instanceConfig.getTargetTaskThreadPoolSize(); |
| // Reject negative values. The pool size is only negative when it's not set in |
| // InstanceConfig, or when the users bypassed the setter logic in InstanceConfig. We treat |
| // negative values as the value is not set, and continue with ClusterConfig. |
| if (targetTaskThreadPoolSize >= 0) { |
| return targetTaskThreadPoolSize; |
| } |
| } else { |
| LOG.warn( |
| "Got null as InstanceConfig for instance {} in cluster {}. Continuing with ClusterConfig. ", |
| instanceName, clusterName); |
| } |
| } |
| |
| ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); |
| if (clusterConfig != null) { |
| int globalTargetTaskThreadPoolSize = clusterConfig.getGlobalTargetTaskThreadPoolSize(); |
| // Reject negative values. The pool size is only negative when it's not set in |
| // ClusterConfig, or when the users bypassed the setter logic in ClusterConfig. We treat |
| // negative values as the value is not set, and continue with the default value. |
| if (globalTargetTaskThreadPoolSize >= 0) { |
| return globalTargetTaskThreadPoolSize; |
| } |
| } else { |
| LOG.warn("Got null as ClusterConfig for cluster {}. Returning default value: {}. ", |
| clusterName, TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE); |
| } |
| |
| return TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE; |
| } |
| } |