Quota-based task scheduling is a feature addition to Helix Task Framework that enables users of Task Framework to apply the notion of categories in distributed task management.
As Helix Task Framework gains usage in other open-source frameworks such as Apache Gobblin and Apache Pinot, it has also seen an increase in the variety in the types of distributed tasks it was managing. There have also been explicit feature requests to Helix for differentiating different types of tasks by creating corresponding quotas.
Quota-based task scheduling aims to fulfill these requests by allowing users to define a quota profile consisting of quota types and their corresponding quotas. The goal of this feature is threefold: 1) the user will have the ability to prioritize one type of workflows/jobs/tasks over another and 2) achieve isolation among the type of tasks and 3) make monitoring easier by tracking the status of distributed execution by type.
AssignableInstance is an abstraction that represents each live Participant that is able to take on tasks from the Controller. Each AssignableInstance will cache what tasks it has running as well as remaining task counts from the quota-based capacity calculation.
AssignableInstanceManager manages all AssignableInstances. It also serves as a connecting layer between the Controller and each AssignableInstance. AssignableInstanceManager also provides a set of interfaces that allows the Controller to easily determine whether an AssignableInstance is able to take on more tasks.
The TaskAssigner interface provides basic API methods that involve assignments of tasks based on quota constraints. Currently, Task Framework only concerns the number of Participant-side JVM threads, each of which corresponds to an active task.
This new component serves as an iterator for JobDAGs for the Controller. Previously, task assignment required the Controller to iterate through all jobs and their underlying tasks to determine whether there were any tasks that needed to be assigned and scheduled. This proved to be inefficient and did not scale with the increasing load we were putting on Task Framework. Each RuntimeJobDag records states, that is, it knows what task needs to be offered up to the Controller for scheduling. This saves the redundant computation for the Controller every time it goes through the TaskSchedulingStage of the Task pipeline.
Quota-based task scheduling works as follows. If a quota type is set, Task Framework will calculate a ratio against the sum of all quota config numbers for each quota type. Then it will apply that ratio to find the actual resource amount allotted to each quota type. Here is an example to illustrate this: Suppose the quota config is as follows:
"QUOTA_TYPES":{ "A":"2" ,"B":"1" ,"DEFAULT":"1" }
Based on these raw numbers, Task Framework will compute the ratios. With the ratios, Task Framework will apply them to find the actual resource amount per quota type. The following table summarizes these calculations with the assumption of 40 JVM threads per instance:
Quota Type | Quota Config | Ratio | Actual Resource Allotted (# of JVM Threads) |
---|---|---|---|
A | 2 | 50% | 20 |
B | 1 | 25% | 10 |
DEFAULT | 1 | 25% | 10 |
Every instance (node) will have a quota profile that looks like this. This has a few implications. First, this allows for prioritization of certain jobs by allotting a greater amount of resources to corresponding quota types. In that sense, you may treat quota config numbers/ratios as user-defined priority values. More specifically, take the quota profile in the example above. In this case, when there are 100 jobs submitted for each quota type, jobs of type A will finish faster; in other words, quota type A will see twice as much throughput when there is a continuous stream of jobs due to its quota ratio being twice that of other quota types.
Quota-based task scheduling also allows for isolation/compartmentalization in scheduling jobs. Suppose there are two categories of jobs, with the first category being urgent jobs that are short-lived but need to be run right away. On the other hand, suppose that the second category of jobs tend to take longer, but they aren't as urgent and can take their time running. Previously, these two types of jobs will get assigned, scheduled, and run in a mix, and it was indeed difficult to ensure that jobs in the first category be processed in an urgent manner. Quota-based scheduling solves this problem by allowing the user to create quota types that model “categories” with different characteristics and requirements.
In order to use quota-based task scheduling, you must establish a quota config first. This is a one-time operation, and once you verified that your ClusterConfig has a quota config set, there is no need to set it again. See the following code snippet for example:
ClusterConfig clusterConfig = _manager.getConfigAccessor().getClusterConfig(CLUSTER_NAME); // Retrieve ClusterConfig clusterConfig.resetTaskQuotaRatioMap(); // Optional: you may want to reset the quota config before creating a new quota config clusterConfig.setTaskQuotaRatio(DEFAULT_QUOTA_TYPE, 10); // Define the default quota (DEFAULT_QUOTA_TYPE = "DEFAULT") clusterConfig.setTaskQuotaRatio("A", 20); // Define quota type A clusterConfig.setTaskQuotaRatio("B", 10); // Define quota type B _manager.getConfigAccessor().setClusterConfig(CLUSTER_NAME, clusterConfig); // Set the new ClusterConfig
A word of caution - if you do set the quota config, you must always define the default quota type (with the key “DEFAULT”). Otherwise, jobs with no type information will no longer be scheduled and run. If you have been using Task Framework prior to the inception of quota-based scheduling, you might have recurrent workflows whose jobs do not have any type set. If you neglect to include the default quota type, these recurrent workflows will not execute properly.
Upon setting the quota config in ClusterConfig, you will see the updated field in your ZooKeeper cluster config ZNode in the JSON format. See an example below:
{ "id":"Example_Cluster" ,"simpleFields":{ "allowParticipantAutoJoin":"true" } ,"listFields":{ } ,"mapFields":{ "QUOTA_TYPES":{ "A":"20" ,"B":"10" ,"DEFAULT":"10" } } }
JobConfig.Builder jobBuilderA = new JobConfig.Builder().setCommand(JOB_COMMAND).setJobCommandConfigMap(_jobCommandMap) .addTaskConfigs(taskConfigsA).setNumConcurrentTasksPerInstance(50).setJobType("A"); // Setting the job quota type as "A" workflowBuilder.addJob("JOB_A", jobBuilderA);
Quota-based task scheduling has been tested internally at LinkedIn and has been integrated into Apache Gobblin, enabling users of Helix Task Framework and Gobblin‘s Job Launcher to define categories and corresponding quota values. There are a few immediate to-do’s that will improve the usability of this feature:
Currently, quota profiles apply across the entire cluster; that is, one quota profile defined in ClusterConfig will apply globally for all Participants. However, some use cases may require that each Participant have a different quota profile.
Helix Task Framework has the maximum number of task threads set at 40. Making this configurable will potentially allow some users to increase throughput of tasks depending on the duration of execution of such tasks.
Currently, the number of JVM threads per Participant is the only dimension where Helix Task Framework defines quota in. However, as discussed in earlier sections, this is extendable to commonly-used constraints such as CPU usage, memory usage, or disk usage. As new dimensions are added, there will need to be additional implementation of the TaskAssigner interface that produces assignments for tasks based on constraints.