blob: bb9dd2a7c801507469e3a3e9231c7c755277576d [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import tempfile
import unittest
from unittest import TestCase
from liminal.build import liminal_apps_builder
from liminal.kubernetes import secret_util, volume_util
from liminal.runners.airflow import DummyDag
from liminal.runners.airflow.executors.kubernetes import KubernetesPodExecutor
from liminal.runners.airflow.tasks import python
from tests.util import dag_test_utils
class TestPythonTask(TestCase):
_VOLUME_NAME = 'myvol1'
_SECRET_NAME = 'aws'
def setUp(self) -> None:
volume_util.delete_local_volume(self._VOLUME_NAME)
secret_util.delete_local_secret(self._SECRET_NAME)
os.environ['TMPDIR'] = '/tmp'
self.temp_dir = tempfile.mkdtemp()
self.liminal_config = {
'volumes': [
{
'volume': self._VOLUME_NAME,
'local': {'path': self.temp_dir.replace("/var/folders", "/private/var/folders")},
}
],
'secrets': [{'secret': self._SECRET_NAME, 'remote_path': "/mnt"}],
}
volume_util.create_local_volumes(self.liminal_config, None)
secret_util.create_local_secrets(self.liminal_config)
liminal_apps_builder.build_liminal_apps(os.path.join(os.path.dirname(__file__), '../liminal'))
def test_apply_task_to_dag(self):
dag = dag_test_utils.create_dag()
task0 = self.__create_python_task(
dag,
'my_input_task',
None,
'my_python_task_img',
'python -u write_inputs.py',
env_vars={'NUM_FILES': 10, 'NUM_SPLITS': 3, 'AWS_CONFIG_FILE': '/mnt/credentials', 'AWS_PROFILE': 'dev'},
)
executor = KubernetesPodExecutor(
task_id='k8s', liminal_config=self.liminal_config, executor_config={'executor': 'k8s', 'name': 'mypod'}
)
executor.apply_task_to_dag(task=task0)
task1 = self.__create_python_task(
dag,
'my_output_task',
dag.tasks[0],
'my_parallelized_python_task_img',
'python -u write_outputs.py',
executors=3,
)
executor.apply_task_to_dag(task=task1)
for task in dag.tasks:
print(f'Executing task {task.task_id}')
task.execute(DummyDag('my_dag', task.task_id).context)
inputs_dir = os.path.join(self.temp_dir, 'inputs')
outputs_dir = os.path.join(self.temp_dir, 'outputs')
self.assertListEqual(sorted(os.listdir(self.temp_dir)), sorted(['outputs', 'inputs']))
inputs_dir_contents = sorted(os.listdir(inputs_dir))
self.assertListEqual(inputs_dir_contents, ['0', '1', '2'])
self.assertListEqual(
sorted(os.listdir(os.path.join(inputs_dir, '0'))),
['input0.json', 'input3.json', 'input6.json', 'input9.json'],
)
self.assertListEqual(
sorted(os.listdir(os.path.join(inputs_dir, '1'))), ['input1.json', 'input4.json', 'input7.json']
)
self.assertListEqual(
sorted(os.listdir(os.path.join(inputs_dir, '2'))), ['input2.json', 'input5.json', 'input8.json']
)
self.assertListEqual(
sorted(os.listdir(outputs_dir)),
[
'output0.txt',
'output1.txt',
'output2.txt',
'output3.txt',
'output4.txt',
'output5.txt',
'output6.txt',
'output7.txt',
'output8.txt',
'output9.txt',
],
)
for filename in os.listdir(outputs_dir):
with open(os.path.join(outputs_dir, filename)) as f:
expected_file_content = filename.replace('output', 'myval').replace('.txt', '')
self.assertEqual(f.read(), expected_file_content)
def __create_python_task(self, dag, task_id, parent, image, cmd, env_vars=None, executors=None):
self.liminal_config['volumes'] = [
{
'volume': self._VOLUME_NAME,
'local': {'path': self.temp_dir.replace("/var/folders", "/private/var/folders")},
}
]
self.liminal_config['executors'] = [
{
'executor': 'k8s',
'type': 'kubernetes',
}
]
task_config = {
'task': task_id,
'cmd': cmd,
'image': image,
'env_vars': env_vars if env_vars is not None else {},
'mounts': [{'mount': 'mymount', 'volume': self._VOLUME_NAME, 'path': '/mnt/vol1'}],
'secrets': [{'secret': self._SECRET_NAME}],
}
if executors:
task_config['executors'] = executors
return python.PythonTask(
task_id=task_id,
dag=dag,
liminal_config=self.liminal_config,
pipeline_config={'pipeline': 'my_pipeline'},
task_config=task_config,
parent=parent,
trigger_rule='all_success',
)
if __name__ == '__main__':
unittest.main()