blob: b62ef007a30051aa924459f75c75e6237ff976ff [file]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Helpers for composing Thermos workflows."""
# checkstyle: noqa
import itertools
from functools import reduce
from pystachio import Empty, List
from twitter.common.lang import Compatibility
from .schema_base import GB, Constraint, Process, Resources, Task
__all__ = (
# shorthand for process ordering constraint
'order',
# task combinators
'combine_tasks', # merge N tasks in parallel
'concat_tasks', # serialize N tasks
# options helpers
'java_options',
'python_options',
# the automatically-sequential version of a task
'SequentialTask',
# create a simple task from a command line + name
'SimpleTask',
# helper classes
'Options',
'Processes',
'Tasks',
'Units',
)
class Units(object):
"""Helpers for base units of Tasks and Processes."""
@classmethod
def safe_get(cls, unit):
return 0 if unit is Empty else unit.get()
@classmethod
def optional_resources(cls, resources):
return Resources() if resources is Empty else resources
@classmethod
def resources_sum(cls, *resources):
"""Add two Resources objects together."""
def add_unit(f1, f2):
return cls.safe_get(f1) + cls.safe_get(f2)
def add(r1, r2):
return Resources(cpu=add_unit(r1.cpu(), r2.cpu()),
ram=add_unit(r1.ram(), r2.ram()),
disk=add_unit(r1.disk(), r2.disk()),
gpu=add_unit(r1.gpu(), r2.gpu()))
return reduce(
add,
map(cls.optional_resources, resources),
Resources(cpu=0, ram=0, disk=0, gpu=0))
@classmethod
def finalization_wait_sum(cls, waits):
"""Return a finalization_wait that is the sum of the inputs"""
return sum(map(cls.safe_get, waits))
@classmethod
def resources_max(cls, resources):
"""Return a Resource object that is the maximum of the inputs along each
resource dimension."""
def max_unit(f1, f2):
return max(cls.safe_get(f1), cls.safe_get(f2))
def resource_max(r1, r2):
return Resources(cpu=max_unit(r1.cpu(), r2.cpu()),
ram=max_unit(r1.ram(), r2.ram()),
disk=max_unit(r1.disk(), r2.disk()),
gpu=max_unit(r1.gpu(), r2.gpu()))
return reduce(
resource_max,
map(cls.optional_resources, resources),
Resources(cpu=0, ram=0, disk=0, gpu=0))
@classmethod
def finalization_wait_max(cls, waits):
"""Return a finalization_wait that is the maximum of the inputs"""
return max([0] + [cls.safe_get(unit) for unit in waits])
@classmethod
def processes_merge(cls, tasks):
"""Return a deduped list of the processes from all tasks."""
return list(set(itertools.chain.from_iterable(task.processes() for task in tasks)))
@classmethod
def constraints_merge(cls, tasks):
"""Return a deduped list of the constraints from all tasks."""
return list(set(itertools.chain.from_iterable(task.constraints() for task in tasks)))
class Processes(object):
"""Helper class for Process objects."""
@classmethod
def _process_name(cls, process):
if isinstance(process, Process):
return process.name()
elif isinstance(process, Compatibility.string):
return process
raise ValueError("Unknown value for process order: %s" % repr(process))
@classmethod
def order(cls, *processes):
"""Given a list of processes, return the list of constraints that keeps them in order, e.g.
order(p1, p2, p3) => [Constraint(order=[p1.name(), p2.name(), p3.name()])].
Similarly, concatenation operations are valid, i.e.
order(p1, p2) + order(p2, p3) <=> order(p1, p2, p3)
"""
return [Constraint(order=[cls._process_name(p) for p in processes])]
class Tasks(object):
"""Helper class for Task objects."""
SIMPLE_CPU = 1.0
SIMPLE_RAM = 1 * GB
SIMPLE_DISK = 1 * GB
@classmethod
def _combine_processes(cls, *tasks):
"""Given multiple tasks, merge their processes together, retaining the identity of the first
task."""
if len(tasks) == 0:
return Task()
head_task = tasks[-1]
return head_task(processes=Units.processes_merge(tasks))
@classmethod
def combine(cls, *tasks, **kw):
"""Given multiple tasks, return a Task that runs all processes in parallel."""
if len(tasks) == 0:
return Task()
base = cls._combine_processes(*tasks)
return base(
resources=Units.resources_sum(*(task.resources() for task in tasks)),
constraints=Units.constraints_merge(tasks),
finalization_wait=Units.finalization_wait_max(task.finalization_wait() for task in tasks),
**kw
)
@classmethod
def concat(cls, *tasks, **kw):
"""Given tasks T1...TN, return a single Task that runs all processes such that
all processes in Tk run before any process in Tk+1."""
if len(tasks) == 0:
return Task()
base = cls._combine_processes(*tasks)
base_constraints = Units.constraints_merge(tasks)
# TODO(wickman) be smarter about this in light of existing constraints
for (t1, t2) in zip(tasks[0:-1], tasks[1:]):
for p1 in t1.processes():
for p2 in t2.processes():
if p1 != p2:
base_constraints.extend(Processes.order(p1, p2))
return base(
resources=Units.resources_max(task.resources() for task in tasks),
constraints=base_constraints,
finalization_wait=Units.finalization_wait_sum(task.finalization_wait() for task in tasks),
**kw)
@classmethod
def simple(cls, name, command):
"""Create a usable Task from a provided name + command line and a default set of resources"""
return Task(
name=name,
processes=[Process(name=name, cmdline=command)],
resources=Resources(cpu=cls.SIMPLE_CPU,
ram=cls.SIMPLE_RAM,
disk=cls.SIMPLE_DISK))
@classmethod
def sequential(cls, task):
"""Add a constraint that makes all processes within a task run sequentially."""
def maybe_constrain(task):
return {'constraints': order(*task.processes())} if task.processes() is not Empty else {}
if task.constraints() is Empty or task.constraints() == List(Constraint)([]):
return task(**maybe_constrain(task))
raise ValueError('Cannot turn a Task with existing constraints into a SequentialTask!')
class Options(object):
"""Helper class for constructing command-line arguments."""
@classmethod
def render_option(cls, short_prefix, long_prefix, option, value=None):
option = '%s%s' % (short_prefix if len(option) == 1 else long_prefix, option)
return '%s %s' % (option, value) if value else option
@classmethod
def render_options(cls, short_prefix, long_prefix, *options, **kw_options):
renders = []
for option in options:
if isinstance(option, Compatibility.string):
renders.append(cls.render_option(short_prefix, long_prefix, option))
elif isinstance(option, dict):
# preserve order in case option is an OrderedDict, rather than recursing with **option
for argument, value in option.items():
renders.append(cls.render_option(short_prefix, long_prefix, argument, value))
else:
raise ValueError('Got an unexpected argument to render_options: %s' % repr(option))
for argument, value in kw_options.items():
renders.append(cls.render_option(short_prefix, long_prefix, argument, value))
return renders
@classmethod
def java(cls, *options, **kw_options):
"""
Given a set of arguments, keyword arguments or dictionaries, render
command-line parameters accordingly. For example:
java_options('a', 'b') == '-a -b'
java_options({
'a': 23,
'b': 'foo'
}) == '-a 23 -b foo'
java_options(a=23, b='foo') == '-a 23 -b foo'
"""
return ' '.join(cls.render_options('-', '-', *options, **kw_options))
@classmethod
def python(cls, *options, **kw_options):
"""
Given a set of arguments, keyword arguments or dictionaries, render
command-line parameters accordingly. Single letter parameters are
rendered with single '-'. For example:
python_options('a', 'boo') == '-a --boo'
python_options({
'a': 23,
'boo': 'foo'
}) == '-a 23 --boo foo'
python_options(a=23, boo='foo') == '-a 23 --boo foo'
"""
return ' '.join(cls.render_options('-', '--', *options, **kw_options))
def SimpleTask(name, command):
"""A simple command-line Task with default resources"""
return Tasks.simple(name, command)
def SequentialTask(*args, **kw):
"""A Task whose processes are always sequential."""
return Tasks.sequential(Task(*args, **kw))
python_options = Options.python
java_options = Options.java
combine_tasks = Tasks.combine
concat_tasks = Tasks.concat
order = Processes.order