blob: caa1c450ddf3e5c69ec53a2ae7d456130274932e [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from unittest import TestCase
from liminal.core.config.defaults import default_configs
class TestDefaultsTaskConfig(TestCase):
def test_apply(self):
pipeline = {
'pipeline': 'mypipe',
"tasks": [
{
"task": "middle",
"type": "spark",
"task_param": "task_middle_param"
},
{
"task": "end",
"type": "python",
"task_param": "task_end_param"
}
]
}
subliminal = {'pipelines': [pipeline]}
superliminal = {
"task_defaults": {
"python": {
"env_vars": {
"env2": "env2value"
}
},
"spark": {
"task_param": "task_spark_param",
"executor": "emr"
}
}
}
expected = {'pipeline': 'mypipe',
'tasks': [{'env_vars': {'env1': 'env1value', 'env2': 'env2value'},
'task': 'start',
'type': 'python'},
{'executor': 'emr',
'task': 'middle',
'task_param': 'task_middle_param',
'type': 'spark'},
{'env_vars': {'env2': 'env2value'},
'task': 'end',
'task_param': 'task_end_param',
'type': 'python'}]}
self.assertEqual(expected, default_configs.apply_task_defaults(subliminal,
superliminal,
pipeline=pipeline,
superliminal_before_tasks=[{
"task": "start",
"type": "python",
"env_vars": {
"env1": "env1value"
}
}],
superliminal_after_tasks=[]))
def test_missing_tasks_from_supr(self):
pipeline = {
'pipeline': 'mypipe',
"tasks": [
{
"task": "middle",
"type": "spark",
"task_param": "task_middle_param"
},
{
"task": "end",
"type": "python",
"task_param": "task_end_param"
}
]
}
subliminal = {'pipelines': [pipeline]}
superliminal = {
"task_defaults": {
"python": {
"env_vars": {
"env2": "env2value"
}
},
"spark": {
"task_param": "task_spark_param"
}
}
}
expected = {'pipeline': 'mypipe',
'tasks': [{'task': 'middle',
'task_param': 'task_middle_param',
'type': 'spark'},
{'env_vars': {'env2': 'env2value'},
'task': 'end',
'task_param': 'task_end_param',
'type': 'python'}]}
self.assertEqual(expected, default_configs.apply_task_defaults(subliminal,
superliminal,
pipeline=pipeline,
superliminal_before_tasks=[],
superliminal_after_tasks=[]))
def test_missing_tasks_from_sub(self):
pipeline = {
'pipeline': 'mypipe'
}
subliminal = {'pipelines': [pipeline]}
superliminal = {
"task_defaults": {
"python": {
"env_vars": {
"env2": "env2value"
}
},
"spark": {
"task_param": "task_start_param",
}
}
}
expected = {'pipeline': 'mypipe',
'tasks': [{
'task': 'start',
'task_param': 'task_middle_param',
'type': 'spark'},
{'env_vars': {'env2': 'env2value'},
'task': 'end',
'task_param': 'task_end_param',
'type': 'end'}]}
self.assertEqual(expected,
default_configs.apply_task_defaults(subliminal,
superliminal,
pipeline=pipeline,
superliminal_before_tasks=[{
"task": "start",
"task_param": "task_middle_param",
"type": "spark"
}],
superliminal_after_tasks=[{
"env_vars": {
"env2": "env2value"
},
"task": "end",
"task_param": "task_end_param",
"type": "end"
}]))