blob: b56ef84c264b99b25d7eb1f44aa211df164cfed6 [file]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import pytest
from apache.thermos.common.planner import TaskPlanner
from apache.thermos.config.schema import Constraint, Process, Task
p1 = Process(name="p1", cmdline="")
p2 = Process(name="p2", cmdline="")
p3 = Process(name="p3", cmdline="")
unordered_task = Task(name="unordered", processes=[p1, p2, p3])
ordered_task = unordered_task(constraints=[{'order': ['p1', 'p2', 'p3']}])
empty_task = Task(name="empty", processes=[])
def _(*processes): # noqa
return set(processes)
empty = set()
def approx_equal(a, b):
return abs(a - b) < 0.001
def test_task_construction():
p = TaskPlanner(empty_task)
assert p.runnable == empty
assert p.is_complete()
p = TaskPlanner(unordered_task)
assert p.runnable == _('p1', 'p2', 'p3')
assert not p.is_complete()
p = TaskPlanner(ordered_task)
assert p.runnable == _('p1')
assert not p.is_complete()
def test_task_finish_with_ephemerals():
pure_ephemeral = empty_task(processes=[p1(ephemeral=True)])
p = TaskPlanner(pure_ephemeral)
assert p.is_complete()
p.set_running('p1')
assert p.is_complete()
p.add_failure('p1')
assert p.is_complete()
assert not p.failed
assert p.finished == _('p1')
with_ephemeral = empty_task(processes=[p1, p2(ephemeral=True)])
p = TaskPlanner(with_ephemeral)
assert not p.is_complete()
assert p.runnable == _('p1', 'p2')
p.set_running('p1')
assert not p.is_complete()
p.add_failure('p1')
assert p.is_complete()
p.set_running('p2')
assert p.is_complete()
p.add_failure('p2')
assert p.is_complete()
assert p.failed == _('p1')
def test_task_finish_with_daemons():
# Daemon is still restricted to the failure limit
p = TaskPlanner(empty_task(processes=[p1(daemon=True)]))
assert not p.is_complete()
p.set_running('p1')
assert not p.is_complete()
p.add_failure('p1')
assert p.is_complete()
# Resilient to two failures
p = TaskPlanner(empty_task(processes=[p1(daemon=True, max_failures=2)]))
assert not p.is_complete()
p.set_running('p1')
assert not p.is_complete()
p.add_failure('p1')
assert not p.is_complete()
p.set_running('p1')
assert not p.is_complete()
p.add_failure('p1')
assert p.is_complete()
# Can swallow successes
p = TaskPlanner(empty_task(processes=[p1(daemon=True, max_failures=2)]))
assert not p.is_complete()
p.set_running('p1')
assert not p.is_complete()
p.add_failure('p1')
assert not p.is_complete()
p.set_running('p1')
assert not p.is_complete()
p.add_success('p1')
assert not p.is_complete()
p.set_running('p1')
assert not p.is_complete()
p.add_failure('p1')
assert p.is_complete()
def test_task_finish_with_daemon_ephemerals():
p = TaskPlanner(empty_task(processes=[p1, p2(daemon=True, ephemeral=True, max_failures=2)]))
assert not p.is_complete()
p.set_running('p1')
p.set_running('p2')
assert not p.is_complete()
p.add_success('p1')
assert p.is_complete()
def test_task_process_cannot_depend_upon_daemon():
with pytest.raises(TaskPlanner.InvalidSchedule):
TaskPlanner(empty_task(processes=[p1(daemon=True), p2], constraints=[{'order': ['p1', 'p2']}]))
def test_task_non_ephemeral_process_cannot_depend_on_ephemeral_process():
with pytest.raises(TaskPlanner.InvalidSchedule):
TaskPlanner(empty_task(processes=[p1(ephemeral=True), p2],
constraints=[{'order': ['p1', 'p2']}]))
def test_task_failed_predecessor_does_not_make_process_runnable():
p = TaskPlanner(empty_task(processes=[p1, p2], constraints=[{'order': ['p1', 'p2']}]))
p.set_running('p1')
p.add_success('p1')
assert 'p2' in p.runnable
assert not p.is_complete()
p = TaskPlanner(empty_task(processes=[p1, p2], constraints=[{'order': ['p1', 'p2']}]))
p.set_running('p1')
p.add_failure('p1')
assert 'p2' not in p.runnable
assert not p.is_complete()
def test_task_daemon_duration():
p = TaskPlanner(empty_task(processes=[p1(daemon=True, max_failures=2, min_duration=10)]))
assert 'p1' in p.runnable
p.set_running('p1')
p.add_success('p1', timestamp=5)
assert 'p1' not in p.runnable_at(timestamp=5)
assert 'p1' not in p.runnable_at(timestamp=10)
assert 'p1' in p.runnable_at(timestamp=15)
assert 'p1' in p.runnable_at(timestamp=20)
p.set_running('p1')
p.add_failure('p1', timestamp=10)
assert 'p1' not in p.runnable_at(timestamp=10)
assert 'p1' not in p.runnable_at(timestamp=15)
assert 'p1' in p.runnable_at(timestamp=20)
assert 'p1' in p.runnable_at(timestamp=25)
p.set_running('p1')
p.add_failure('p1', timestamp=15)
assert 'p1' not in p.runnable_at(timestamp=15)
assert 'p1' not in p.runnable_at(timestamp=20)
assert 'p1' not in p.runnable_at(timestamp=25) # task past maximum failure limit
assert 'p1' not in p.runnable_at(timestamp=30)
def test_task_waits():
dt = p1(daemon=True, max_failures=0)
p = TaskPlanner(empty_task(processes=[dt(name='d3', min_duration=3),
dt(name='d5', min_duration=5),
dt(name='d7', min_duration=7)]))
assert p.runnable_at(timestamp=0) == _('d3', 'd5', 'd7')
assert p.min_wait(timestamp=0) == 0
p.set_running('d3')
p.add_success('d3', timestamp=0)
assert p.runnable_at(timestamp=0) == _('d5', 'd7')
assert p.waiting_at(timestamp=0) == _('d3')
assert approx_equal(p.get_wait('d3', timestamp=0), 3)
assert approx_equal(p.min_wait(timestamp=0), 0)
assert approx_equal(p.min_wait(timestamp=1), 0)
assert p.waiting_at(timestamp=3) == empty
assert p.runnable_at(timestamp=3) == _('d3', 'd5', 'd7')
p.set_running('d3')
p.set_running('d7')
p.add_success('d7', timestamp=1)
assert approx_equal(p.min_wait(timestamp=2), 0)
p.set_running('d5')
assert approx_equal(p.min_wait(timestamp=2), 6)
p.add_success('d5', timestamp=2)
p.add_success('d3', timestamp=2)
assert approx_equal(p.min_wait(timestamp=3), 2)
assert p.runnable_at(timestamp=2) == empty
assert p.runnable_at(timestamp=5) == _('d3')
assert p.runnable_at(timestamp=7) == _('d3', 'd5')
assert p.runnable_at(timestamp=8) == _('d3', 'd5', 'd7')
def test_task_fails():
dt = p1(max_failures=1, min_duration=1)
p = TaskPlanner(empty_task(processes=[dt(name='d1'), dt(name='d2')]))
assert p.runnable_at(timestamp=0) == _('d1', 'd2')
p.set_running('d1')
p.set_running('d2')
assert p.runnable_at(timestamp=0) == empty
p.add_failure('d1', timestamp=0)
p.add_failure('d2', timestamp=0)
assert p.runnable_at(timestamp=0) == empty
assert p.min_wait(timestamp=0) == TaskPlanner.INFINITY
p = TaskPlanner(empty_task(processes=[dt(name='d1'), dt(name='d2')]))
assert p.runnable_at(timestamp=0) == _('d1', 'd2')
p.set_running('d1')
p.set_failed('d1')
assert p.runnable_at(timestamp=0) == _('d2')
p.set_running('d2')
assert p.runnable_at(timestamp=0) == empty
p.add_failure('d2', timestamp=0)
assert p.runnable_at(timestamp=0) == empty
assert p.min_wait(timestamp=0) == TaskPlanner.INFINITY
# test max_failures=0 && daemon==True ==> retries forever
p = TaskPlanner(empty_task(processes=[dt(name='d1', max_failures=0, daemon=True)]))
for k in range(10):
p.set_running('d1')
assert 'd1' in p.running
assert 'd1' not in p.failed
p.add_failure('d1')
assert 'd1' not in p.running
assert 'd1' not in p.failed
p.set_running('d1')
assert 'd1' in p.running
assert 'd1' not in p.failed
p.add_success('d1')
assert 'd1' not in p.running
assert 'd1' not in p.failed
assert 'd1' not in p.finished
p = TaskPlanner(empty_task(processes=[dt(name='d1', max_failures=0)]))
p.set_running('d1')
assert 'd1' in p.running
assert 'd1' not in p.failed
p.add_failure('d1')
assert 'd1' not in p.running
assert 'd1' not in p.failed
p.set_running('d1')
assert 'd1' in p.running
assert 'd1' not in p.failed
p.add_success('d1')
assert 'd1' not in p.running
assert 'd1' not in p.failed
assert 'd1' in p.finished
def test_task_lost():
dt = p1(max_failures=2, min_duration=1)
# regular success behavior
p = TaskPlanner(empty_task(processes=[dt(name='d1')]))
assert p.runnable_at(timestamp=0) == _('d1')
p.set_running('d1')
p.add_success('d1', timestamp=0)
assert p.min_wait(timestamp=0) == TaskPlanner.INFINITY
# regular failure behavior
p = TaskPlanner(empty_task(processes=[dt(name='d1')]))
assert p.runnable_at(timestamp=0) == _('d1')
p.set_running('d1')
p.add_failure('d1', timestamp=1)
assert approx_equal(p.min_wait(timestamp=1), 1)
p.set_running('d1')
p.add_failure('d1', timestamp=3)
assert p.min_wait(timestamp=3) == TaskPlanner.INFINITY
# lost behavior
p = TaskPlanner(empty_task(processes=[dt(name='d1')]))
assert p.runnable_at(timestamp=0) == _('d1')
p.set_running('d1')
p.add_failure('d1', timestamp=1)
assert approx_equal(p.min_wait(timestamp=1), 1)
p.set_running('d1')
p.lost('d1', timestamp=2)
assert approx_equal(p.min_wait(timestamp=2), 1)
p.set_running('d1')
p.add_failure('d1', timestamp=3)
assert p.min_wait(timestamp=3) == TaskPlanner.INFINITY
def test_task_filters():
t = p1
task = empty_task(
processes=[
t(name='p1'), t(name='p2'), t(name='p3'),
t(name='f1'), t(name='f2'), t(name='f3')],
constraints=[
Constraint(order=['p1', 'p2', 'p3']),
Constraint(order=['f1', 'f2', 'f3'])])
assert TaskPlanner(task, process_filter=lambda proc: proc.name().get().startswith('p'))
assert TaskPlanner(task, process_filter=lambda proc: proc.name().get().startswith('f'))
with pytest.raises(TaskPlanner.InvalidSchedule):
TaskPlanner(task(constraints=[Constraint(order=['p1', 'f1'])]),
process_filter=lambda proc: proc.name().get().startswith('p'))
with pytest.raises(TaskPlanner.InvalidSchedule):
TaskPlanner(task(constraints=[Constraint(order=['p1', 'f1'])]),
process_filter=lambda proc: proc.name().get().startswith('f'))
def test_task_max_runs():
class CappedTaskPlanner(TaskPlanner):
TOTAL_RUN_LIMIT = 3
dt = p1(daemon=True, max_failures=0)
p = CappedTaskPlanner(empty_task(processes=[dt(name='d1', max_failures=100, daemon=False)]))
p.set_running('d1')
p.add_failure('d1', timestamp=1)
assert 'd1' in p.runnable
p.set_running('d1')
p.lost('d1', timestamp=2)
assert 'd1' in p.runnable
p.set_running('d1')
p.add_failure('d1', timestamp=3)
assert 'd1' not in p.runnable
p = CappedTaskPlanner(empty_task(processes=[dt(name='d1', max_failures=100)]))
p.set_running('d1')
p.add_failure('d1', timestamp=1)
assert 'd1' in p.runnable
p.set_running('d1')
p.add_success('d1', timestamp=2)
assert 'd1' in p.runnable
p.set_running('d1')
p.lost('d1', timestamp=3)
assert 'd1' not in p.runnable