blob: 5c23d86b6c02f549567a6b2005f4abd4a144c29a [file] [log] [blame]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Planners to schedule processes within Tasks.
TaskPlanner:
- a daemon process can depend upon a regular process
- a regular process cannot depend upon a daemon process
- a non-ephemeral process cannot depend upon an ephemeral process
"""
import copy
import sys
import time
from collections import defaultdict, namedtuple
from functools import partial
class Planner(object):
"""
Given a set of process names and a graph of dependencies between them, determine
what can run predicated upon process completions.
"""
class InvalidSchedule(Exception): pass
@staticmethod
def filter_runnable(processes, dependencies):
return set(process for process in processes if not dependencies.get(process))
@staticmethod
def filter_dependencies(dependencies, given=frozenset()):
"""
Provided a map of process => list of process :dependencies, and a set of satisfied
prior processes in :given, return the new map of dependencies with priors removed.
"""
dependencies = copy.deepcopy(dependencies)
for process_set in dependencies.values():
process_set -= given
return dependencies
@classmethod
def satisfiable(cls, processes, dependencies):
"""
Given a set of processes and a dependency map, determine if this is a consistent
schedule without cycles.
"""
processes = copy.copy(processes)
dependencies = copy.deepcopy(dependencies)
scheduling = True
while scheduling:
scheduling = False
runnables = cls.filter_runnable(processes, dependencies)
if runnables:
scheduling = True
processes -= runnables
dependencies = cls.filter_dependencies(dependencies, given=runnables)
return len(processes) == 0
def __init__(self, processes, dependencies):
self._processes = set(processes)
self._dependencies = dict((process, set(dependencies.get(process, [])))
for process in self._processes)
if not self.satisfiable(self._processes, self._dependencies):
raise self.InvalidSchedule("Cycles detected in the task schedule!")
self._running = set()
self._finished = set()
self._failed = set()
@property
def runnable(self):
return self.filter_runnable(self._processes - self._running - self._finished - self._failed,
self.filter_dependencies(self._dependencies, given=self._finished))
@property
def processes(self):
return set(self._processes)
@property
def running(self):
return set(self._running)
@property
def finished(self):
return set(self._finished)
@property
def failed(self):
return set(self._failed)
def reset(self, process):
assert process in self._running
assert process not in self._finished
assert process not in self._failed
self._running.discard(process)
def set_running(self, process):
assert process not in self._failed
assert process not in self._finished
assert process in self._running or process in self.runnable
self._running.add(process)
def set_finished(self, process):
assert process in self._running
assert process not in self._failed
self._running.discard(process)
self._finished.add(process)
def set_failed(self, process):
assert process in self._running
assert process not in self._finished
self._running.discard(process)
self._failed.add(process)
def is_complete(self):
return self._finished.union(self._failed) == self._processes
TaskAttributes = namedtuple('TaskAttributes', 'min_duration is_daemon max_failures is_ephemeral')
class TaskPlanner(object):
"""
A planner for the processes part of a Thermos task, taking into account ephemeral and daemon
bits, in addition to duration restrictions [and failure limits?].
is_daemon
.----------------------------------------------------.
| |
| clock gate .----------------------. |
| .---------------> | runnable && !waiting | |
v | `----------------------' |
.----------. | |
| runnable | | set_running |
`----------' v |
^ forget .---------. | !is_daemon .----------.
`------------------------| running |--------------+------------> | finished |
^ `---------' add_success `----------'
| |
| under failure limit | add_failure
`-----------------------------+
| past failure limit
v
.--------.
| failed |
`--------'
"""
InvalidSchedule = Planner.InvalidSchedule # noqa
INFINITY = sys.float_info.max
TOTAL_RUN_LIMIT = sys.maxsize
@classmethod
def extract_dependencies(cls, task, process_filter=None):
"""
Construct a set of processes and the process dependencies from a Thermos Task.
"""
process_map = dict((process.name().get(), process)
for process in filter(process_filter, task.processes()))
processes = set(process_map)
dependencies = defaultdict(set)
if task.has_constraints():
for constraint in task.constraints():
# handle process orders
process_names = constraint.order().get()
process_name_set = set(process_names)
# either all process_names must be in processes or none should be
if process_name_set.issubset(processes) == process_name_set.isdisjoint(processes):
raise cls.InvalidSchedule('Invalid process dependencies!')
if not process_name_set.issubset(processes):
continue
for k in range(1, len(process_names)):
pnk, pnk1 = process_names[k], process_names[k - 1]
if process_map[pnk1].daemon().get():
raise cls.InvalidSchedule(
'Process %s may not depend upon daemon process %s' % (pnk, pnk1))
if not process_map[pnk].ephemeral().get() and process_map[pnk1].ephemeral().get():
raise cls.InvalidSchedule(
'Non-ephemeral process %s may not depend upon ephemeral process %s' % (pnk, pnk1))
dependencies[pnk].add(pnk1)
return (processes, dependencies)
def __init__(self, task, clock=time, process_filter=None):
self._filter = process_filter
assert self._filter is None or callable(self._filter), (
'TaskPlanner must be given callable process filter.')
self._planner = Planner(*self.extract_dependencies(task, self._filter))
self._clock = clock
self._last_terminal = {} # process => timestamp of last terminal state
self._failures = defaultdict(int)
self._successes = defaultdict(int)
self._losses = defaultdict(int)
self._attributes = {}
self._ephemerals = set(process.name().get() for process in task.processes()
if (self._filter is None or self._filter(process)) and process.ephemeral().get())
for process in filter(self._filter, task.processes()):
self._attributes[process.name().get()] = TaskAttributes(
is_daemon=bool(process.daemon().get()),
is_ephemeral=bool(process.ephemeral().get()),
max_failures=process.max_failures().get(),
min_duration=process.min_duration().get())
def get_wait(self, process, timestamp=None):
now = timestamp if timestamp is not None else self._clock.time()
if process not in self._last_terminal:
return 0
return self._attributes[process].min_duration - (now - self._last_terminal[process])
def _record_termination_time(self, process, timestamp=None):
timestamp = timestamp if timestamp is not None else self._clock.time()
self._last_terminal[process] = timestamp
def is_ready(self, process, timestamp=None):
return self.get_wait(process, timestamp) <= 0
def is_waiting(self, process, timestamp=None):
return not self.is_ready(process, timestamp)
@property
def runnable(self):
"""A list of processes that are runnable w/o duration restrictions."""
return self.runnable_at(self._clock.time())
@property
def waiting(self):
"""A list of processes that are waiting w/o duration restrictions."""
return self.waiting_at(self._clock.time())
def runnable_at(self, timestamp):
return set(filter(partial(self.is_ready, timestamp=timestamp), self._planner.runnable))
def waiting_at(self, timestamp):
return set(filter(partial(self.is_waiting, timestamp=timestamp), self._planner.runnable))
def min_wait(self, timestamp=None):
"""Return the current wait time for the next process to become runnable, 0 if something is ready
immediately, or sys.float.max if there are no waiters."""
if self.runnable_at(timestamp if timestamp is not None else self._clock.time()):
return 0
waits = [self.get_wait(waiter, timestamp) for waiter in self.waiting_at(timestamp)]
return min(waits) if waits else self.INFINITY
def set_running(self, process):
self._planner.set_running(process)
def add_failure(self, process, timestamp=None):
"""Increment the failure count of a process, and reset it to runnable if maximum number of
failures has not been reached, or mark it as failed otherwise (ephemeral processes do not
count towards the success of a task, and are hence marked finished instead)"""
self._record_termination_time(process, timestamp)
self._failures[process] += 1
self._failure_transition(process)
def has_reached_run_limit(self, process):
runs = self._successes[process] + self._failures[process] + self._losses[process]
return runs >= self.TOTAL_RUN_LIMIT
def _failure_transition(self, process):
if self.has_reached_run_limit(process):
self._planner.set_failed(process)
return
if self._attributes[process].max_failures == 0 or (
self._failures[process] < self._attributes[process].max_failures):
self._planner.reset(process)
elif self._attributes[process].is_ephemeral:
self._planner.set_finished(process)
else:
self._planner.set_failed(process)
def add_success(self, process, timestamp=None):
"""Reset a process to runnable if it is a daemon, or mark it as finished otherwise."""
self._record_termination_time(process, timestamp)
self._successes[process] += 1
self._success_transition(process)
def _success_transition(self, process):
if self.has_reached_run_limit(process):
self._planner.set_failed(process)
return
if not self._attributes[process].is_daemon:
self._planner.set_finished(process)
else:
self._planner.reset(process)
def set_failed(self, process):
"""Force a process to be in failed state. E.g. kill -9 and you want it pinned failed."""
self._planner.set_failed(process)
def lost(self, process, timestamp=None):
"""Mark a process as lost. This sets its runnable state back to the previous runnable
state and does not increment the failure count of the process.
In order to prevent Thermos from overloading itself, even restarts of lost processes will be
throttled by the `min_duration` uptime and capped at TOTAL_RUN_LIMIT."""
self._record_termination_time(process, timestamp)
self._losses[process] += 1
self._lost_transition(process)
def _lost_transition(self, process):
if self.has_reached_run_limit(process):
self._planner.set_failed(process)
return
self._planner.reset(process)
def is_complete(self):
"""A task is complete if all ordinary processes are finished or failed (there may still be
running ephemeral processes)"""
terminals = self._planner.finished.union(self._planner.failed).union(self._ephemerals)
return self._planner.processes == terminals
# TODO(wickman) Should we consider subclassing again?
@property
def failed(self):
return self._planner.failed
@property
def running(self):
return self._planner.running
@property
def finished(self):
return self._planner.finished