blob: 761e8624f318a61941f7484494e0e0167215958d [file] [log] [blame]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import getpass
import json
import re
import pytest
from apache.aurora.config import AuroraConfig
from apache.aurora.config.schema.base import (
AppcImage,
Container,
CoordinatorSlaPolicy as PystachioCoordinatorSlaPolicy,
CountSlaPolicy as PystachioCountSlaPolicy,
Docker,
DockerImage,
ExecutorConfig,
HealthCheckConfig,
Job,
Mesos,
Metadata,
Mode,
Parameter,
PartitionPolicy as PystachioPartitionPolicy,
PercentageSlaPolicy as PystachioPercentageSlaPolicy,
SimpleTask,
Volume
)
from apache.aurora.config.thrift import (
InvalidConfig,
convert as convert_pystachio_to_thrift,
task_instance_from_job
)
from apache.thermos.config.schema import Process, Resources, Task
from gen.apache.aurora.api.constants import AURORA_EXECUTOR_NAME, GOOD_IDENTIFIER_PATTERN_PYTHON
from gen.apache.aurora.api.ttypes import (
CoordinatorSlaPolicy,
CountSlaPolicy,
CronCollisionPolicy,
Identity,
JobKey,
Mode as ThriftMode,
PartitionPolicy,
PercentageSlaPolicy,
Resource
)
from gen.apache.aurora.test.constants import INVALID_IDENTIFIERS, VALID_IDENTIFIERS
HELLO_WORLD = Job(
name='hello_world',
role='john_doe',
environment='staging66',
cluster='smf1-test',
task=Task(
name='main',
processes=[Process(name='hello_world', cmdline='echo {{mesos.instance}}')],
resources=Resources(cpu=0.1, ram=64 * 1048576, disk=64 * 1048576, gpu=2),
)
)
HELLO_WORLD_EXECUTOR_DATA = {
"environment": "staging66",
"health_check_config": {
"health_checker": {
"http": {
"expected_response_code": 0,
"endpoint": "/health",
"expected_response": "ok"
}
},
"min_consecutive_successes": 1,
"initial_interval_secs": 15.0,
"max_consecutive_failures": 0,
"timeout_secs": 1.0,
"interval_secs": 10.0
},
"name": "hello_world",
"service": False,
"max_task_failures": 1,
"executor_config": {
"data": "",
"name": "AuroraExecutor"
},
"cron_collision_policy": "KILL_EXISTING",
"enable_hooks": False,
"cluster": "smf1-test",
"task": {
"processes": [
{
"daemon": False,
"name": "hello_world",
"ephemeral": False,
"max_failures": 1,
"min_duration": 5,
"cmdline": "echo {{mesos.instance}}",
"final": False
}
],
"name": "main",
"finalization_wait": 30,
"max_failures": 1,
"max_concurrency": 0,
"resources": {
"gpu": 2,
"disk": 67108864,
"ram": 67108864,
"cpu": 0.1
},
"constraints": [
]
},
"production": False,
"role": "john_doe",
"metadata": [
],
"lifecycle": {
"http": {
"graceful_shutdown_endpoint": "/quitquitquit",
"graceful_shutdown_wait_secs": 5,
"port": "health",
"shutdown_wait_secs": 5,
"shutdown_endpoint": "/abortabortabort"
}
},
"priority": 0
}
def test_simple_config():
job = convert_pystachio_to_thrift(HELLO_WORLD, ports=frozenset(['health']))
expected_key = JobKey(
role=HELLO_WORLD.role().get(),
environment=HELLO_WORLD.environment().get(),
name=HELLO_WORLD.name().get())
assert job.instanceCount == 1
tti = job.taskConfig
assert job.key == expected_key
assert job.owner == Identity(user=getpass.getuser())
assert job.cronSchedule is None
assert tti.job == expected_key
assert tti.isService is False
assert tti.production is False
assert tti.priority == 0
assert tti.maxTaskFailures == 1
assert tti.constraints == set()
assert tti.metadata == set()
assert tti.tier is None
assert Resource(numCpus=0.1) in list(tti.resources)
assert Resource(ramMb=64) in list(tti.resources)
assert Resource(diskMb=64) in list(tti.resources)
assert Resource(namedPort='health') in list(tti.resources)
assert Resource(numGpus=2) in list(tti.resources)
def test_config_with_tier():
config = HELLO_WORLD(tier='devel')
job = convert_pystachio_to_thrift(config)
assert job.taskConfig.tier == 'devel'
def test_config_with_docker_image():
image_name = 'some-image'
image_tag = 'some-tag'
job = convert_pystachio_to_thrift(
HELLO_WORLD(container=Mesos(image=DockerImage(name=image_name, tag=image_tag))))
assert job.taskConfig.container.mesos.image.appc is None
assert job.taskConfig.container.mesos.image.docker.name == image_name
assert job.taskConfig.container.mesos.image.docker.tag == image_tag
def test_config_with_appc_image():
image_name = 'some-image'
image_id = 'some-image-id'
job = convert_pystachio_to_thrift(
HELLO_WORLD(container=Mesos(image=AppcImage(name=image_name, image_id=image_id))))
assert job.taskConfig.container.mesos.image.docker is None
assert job.taskConfig.container.mesos.image.appc.name == image_name
assert job.taskConfig.container.mesos.image.appc.imageId == image_id
def test_config_with_volumes():
image_name = 'some-image'
image_tag = 'some-tag'
host_path = '/etc/secrets/role/'
container_path = '/etc/secrets/'
volume = Volume(host_path=host_path, container_path=container_path, mode=Mode('RO'))
container = Mesos(image=DockerImage(name=image_name, tag=image_tag), volumes=[volume])
job = convert_pystachio_to_thrift(HELLO_WORLD(container=container))
assert len(job.taskConfig.container.mesos.volumes) == 1
thrift_volume = job.taskConfig.container.mesos.volumes[0]
assert thrift_volume.hostPath == host_path
assert thrift_volume.containerPath == container_path
assert thrift_volume.mode == ThriftMode.RO
def test_docker_with_parameters():
helloworld = HELLO_WORLD(
container=Container(
docker=Docker(image='test_image', parameters=[Parameter(name='foo', value='bar')])
)
)
job = convert_pystachio_to_thrift(helloworld)
assert job.taskConfig.container.docker.image == 'test_image'
def test_config_with_options():
hwc = HELLO_WORLD(
production=True,
priority=200,
service=True,
cron_collision_policy='RUN_OVERLAP',
partition_policy=PystachioPartitionPolicy(delay_secs=10),
constraints={
'dedicated': 'root',
'cpu': 'x86_64'
},
environment='prod'
)
job = convert_pystachio_to_thrift(hwc)
assert job.instanceCount == 1
tti = job.taskConfig
assert tti.production
assert tti.priority == 200
assert tti.isService
assert job.cronCollisionPolicy == CronCollisionPolicy.RUN_OVERLAP
assert len(tti.constraints) == 2
assert job.key.environment == 'prod'
assert tti.partitionPolicy == PartitionPolicy(True, 10)
def test_disable_partition_policy():
hwc = HELLO_WORLD(
production=True,
priority=200,
service=True,
cron_collision_policy='RUN_OVERLAP',
partition_policy=PystachioPartitionPolicy(reschedule=False),
constraints={
'dedicated': 'root',
'cpu': 'x86_64'
},
environment='prod'
)
job = convert_pystachio_to_thrift(hwc)
assert job.taskConfig.partitionPolicy == PartitionPolicy(False, 0)
def test_no_sla_policy():
hwc = HELLO_WORLD()
job = convert_pystachio_to_thrift(hwc)
assert job.taskConfig.slaPolicy is None
def test_percentage_sla_policy():
hwc = HELLO_WORLD(
sla_policy=PystachioPercentageSlaPolicy(percentage=95.0, duration_secs=1800)
)
job = convert_pystachio_to_thrift(hwc)
assert job.taskConfig.slaPolicy.percentageSlaPolicy == PercentageSlaPolicy(
percentage=95.0,
durationSecs=1800)
assert job.taskConfig.slaPolicy.countSlaPolicy is None
assert job.taskConfig.slaPolicy.coordinatorSlaPolicy is None
def test_count_sla_policy():
hwc = HELLO_WORLD(
sla_policy=PystachioCountSlaPolicy(count=10, duration_secs=1800)
)
job = convert_pystachio_to_thrift(hwc)
assert job.taskConfig.slaPolicy.percentageSlaPolicy is None
assert job.taskConfig.slaPolicy.coordinatorSlaPolicy is None
assert job.taskConfig.slaPolicy.countSlaPolicy == CountSlaPolicy(
count=10,
durationSecs=1800)
def test_coordinator_sla_policy_defaults():
hwc = HELLO_WORLD(
sla_policy=PystachioCoordinatorSlaPolicy(coordinator_url='some-url')
)
job = convert_pystachio_to_thrift(hwc)
assert job.taskConfig.slaPolicy.percentageSlaPolicy is None
assert job.taskConfig.slaPolicy.countSlaPolicy is None
assert job.taskConfig.slaPolicy.coordinatorSlaPolicy == CoordinatorSlaPolicy(
coordinatorUrl='some-url',
statusKey='drain'
)
def test_coordinator_sla_policy_status_key():
hwc = HELLO_WORLD(
sla_policy=PystachioCoordinatorSlaPolicy(coordinator_url='some-url', status_key='key')
)
job = convert_pystachio_to_thrift(hwc)
assert job.taskConfig.slaPolicy.percentageSlaPolicy is None
assert job.taskConfig.slaPolicy.countSlaPolicy is None
assert job.taskConfig.slaPolicy.coordinatorSlaPolicy == CoordinatorSlaPolicy(
coordinatorUrl='some-url',
statusKey='key'
)
def test_config_with_ports():
hwc = HELLO_WORLD(
task=HELLO_WORLD.task()(
processes=[
Process(name='hello_world',
cmdline='echo {{thermos.ports[http]}} {{thermos.ports[admin]}}')
]
)
)
config = AuroraConfig(hwc)
job = config.job()
assert Resource(namedPort='http') in list(job.taskConfig.resources)
assert Resource(namedPort='admin') in list(job.taskConfig.resources)
def test_config_with_bad_resources():
MB = 1048576
hwtask = HELLO_WORLD.task()
convert_pystachio_to_thrift(HELLO_WORLD)
good_resources = [
Resources(cpu=1.0, ram=1 * MB, disk=1 * MB)
]
bad_resources = [
Resources(cpu=0, ram=1 * MB, disk=1 * MB),
Resources(cpu=1, ram=0 * MB, disk=1 * MB),
Resources(cpu=1, ram=1 * MB, disk=0 * MB),
Resources(cpu=1, ram=1 * MB - 1, disk=1 * MB),
Resources(cpu=1, ram=1 * MB, disk=1 * MB - 1)
]
for resource in good_resources:
convert_pystachio_to_thrift(HELLO_WORLD(task=hwtask(resources=resource)))
for resource in bad_resources:
with pytest.raises(ValueError):
convert_pystachio_to_thrift(HELLO_WORLD(task=hwtask(resources=resource)))
def test_unbound_references():
def job_command(cmdline):
return AuroraConfig(HELLO_WORLD(task=SimpleTask('hello_world', cmdline))).raw()
# bindingless and bad => good bindings should work
convert_pystachio_to_thrift(job_command('echo hello world'))
convert_pystachio_to_thrift(job_command('echo {{mesos.user}}')
.bind(mesos={'user': '{{mesos.role}}'}))
# unbound
with pytest.raises(InvalidConfig):
convert_pystachio_to_thrift(job_command('echo {{mesos.user}}'))
def test_cron_collision_policy():
cron_schedule = '*/10 * * * *'
CRON_HELLO_WORLD = HELLO_WORLD(cron_schedule=cron_schedule)
tti = convert_pystachio_to_thrift(CRON_HELLO_WORLD)
assert tti.cronSchedule == cron_schedule
assert tti.cronCollisionPolicy == CronCollisionPolicy.KILL_EXISTING
tti = convert_pystachio_to_thrift(CRON_HELLO_WORLD(cron_collision_policy='RUN_OVERLAP'))
assert tti.cronSchedule == cron_schedule
assert tti.cronCollisionPolicy == CronCollisionPolicy.RUN_OVERLAP
with pytest.raises(ValueError):
tti = convert_pystachio_to_thrift(CRON_HELLO_WORLD(cron_collision_policy='GARBAGE'))
def test_metadata_in_config():
job = convert_pystachio_to_thrift(HELLO_WORLD, metadata=[('alpha', 1)])
assert job.instanceCount == 1
tti = job.taskConfig
assert len(tti.metadata) == 1
pi = iter(tti.metadata).next()
assert pi.key == 'alpha'
assert pi.value == '1'
def test_config_with_metadata():
expected_metadata_tuples = frozenset([("city", "LA"), ("city", "SF")])
job = convert_pystachio_to_thrift(
HELLO_WORLD(metadata=[
Metadata(key=key, value=value)
for key, value in expected_metadata_tuples]))
tti = job.taskConfig
metadata_tuples = frozenset((key_value.key, key_value.value)
for key_value in tti.metadata)
assert metadata_tuples == expected_metadata_tuples
def test_config_with_key_collision_metadata():
input_metadata_tuples = frozenset([("city", "LA")])
job = convert_pystachio_to_thrift(
HELLO_WORLD(metadata=[
Metadata(key=key, value=value)
for key, value in input_metadata_tuples]), metadata=[('city', "SF")])
tti = job.taskConfig
metadata_tuples = frozenset((key_value.key, key_value.value)
for key_value in tti.metadata)
expected_metadata_tuples = frozenset([("city", "LA"), ("city", "SF")])
assert metadata_tuples == expected_metadata_tuples
def test_config_with_duplicate_metadata():
expected_metadata_tuples = frozenset([("city", "LA")])
job = convert_pystachio_to_thrift(
HELLO_WORLD(metadata=[
Metadata(key=key, value=value)
for key, value in expected_metadata_tuples]), metadata=[('city', "LA")])
tti = job.taskConfig
metadata_tuples = frozenset((key_value.key, key_value.value)
for key_value in tti.metadata)
assert metadata_tuples == expected_metadata_tuples
def test_config_with_implicit_thermos_executor_config():
job = convert_pystachio_to_thrift(HELLO_WORLD())
assert str(job.taskConfig.executorConfig.name) == AURORA_EXECUTOR_NAME
assert json.loads(job.taskConfig.executorConfig.data) == HELLO_WORLD_EXECUTOR_DATA
def test_config_with_explicit_thermos_executor_config():
job = convert_pystachio_to_thrift(
HELLO_WORLD(executor_config=ExecutorConfig(name=AURORA_EXECUTOR_NAME)))
assert str(job.taskConfig.executorConfig.name) == AURORA_EXECUTOR_NAME
assert json.loads(job.taskConfig.executorConfig.data) == HELLO_WORLD_EXECUTOR_DATA
def test_config_with_custom_executor_config():
job = convert_pystachio_to_thrift(
HELLO_WORLD(executor_config=ExecutorConfig(
name="CustomExecutor", data="{test:'payload'}")))
assert str(job.taskConfig.executorConfig.name) == "CustomExecutor"
assert str(job.taskConfig.executorConfig.data) == "{test:'payload'}"
def test_task_instance_from_job():
instance = task_instance_from_job(
Job(health_check_config=HealthCheckConfig(interval_secs=30)), 0, '')
assert instance is not None
def test_identifier_validation():
matcher = re.compile(GOOD_IDENTIFIER_PATTERN_PYTHON)
for identifier in VALID_IDENTIFIERS:
assert matcher.match(identifier)
for identifier in INVALID_IDENTIFIERS:
assert not matcher.match(identifier)
def test_mesos_hostname_in_task():
hw = HELLO_WORLD(task=Task(name="{{mesos.hostname}}"))
instance = task_instance_from_job(hw, 0, 'test_host')
assert str(instance.task().name()) == 'test_host'