blob: 6cee8608d0aee62b08b4cce81a2d6f5854462d42 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import datetime
import sys
from datetime import timedelta
from subprocess import CalledProcessError
import pytest
from airflow.decorators import task
from airflow.utils import timezone
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
END_DATE = timezone.datetime(2016, 1, 2)
INTERVAL = timedelta(hours=12)
FROZEN_NOW = timezone.datetime(2016, 1, 2, 12, 1, 1)
TI_CONTEXT_ENV_VARS = [
'AIRFLOW_CTX_DAG_ID',
'AIRFLOW_CTX_TASK_ID',
'AIRFLOW_CTX_EXECUTION_DATE',
'AIRFLOW_CTX_DAG_RUN_ID',
]
PYTHON_VERSION = sys.version_info[0]
class TestPythonVirtualenvDecorator:
def test_add_dill(self, dag_maker):
@task.virtualenv(use_dill=True, system_site_packages=False)
def f():
"""Ensure dill is correctly installed."""
import dill # noqa: F401
with dag_maker():
ret = f()
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
def test_no_requirements(self, dag_maker):
"""Tests that the python callable is invoked on task run."""
@task.virtualenv()
def f():
pass
with dag_maker():
ret = f()
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
def test_no_system_site_packages(self, dag_maker):
@task.virtualenv(system_site_packages=False, python_version=PYTHON_VERSION, use_dill=True)
def f():
try:
import funcsigs # noqa: F401
except ImportError:
return True
raise Exception
with dag_maker():
ret = f()
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
def test_system_site_packages(self, dag_maker):
@task.virtualenv(
system_site_packages=False,
requirements=['funcsigs'],
python_version=PYTHON_VERSION,
use_dill=True,
)
def f():
import funcsigs # noqa: F401
with dag_maker():
ret = f()
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
def test_with_requirements_pinned(self, dag_maker):
@task.virtualenv(
system_site_packages=False,
requirements=['funcsigs==0.4'],
python_version=PYTHON_VERSION,
use_dill=True,
)
def f():
import funcsigs
if funcsigs.__version__ != '0.4':
raise Exception
with dag_maker():
ret = f()
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
def test_unpinned_requirements(self, dag_maker):
@task.virtualenv(
system_site_packages=False,
requirements=['funcsigs', 'dill'],
python_version=PYTHON_VERSION,
use_dill=True,
)
def f():
import funcsigs # noqa: F401
with dag_maker():
ret = f()
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
def test_fail(self, dag_maker):
@task.virtualenv()
def f():
raise Exception
with dag_maker():
ret = f()
with pytest.raises(CalledProcessError):
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
def test_python_3(self, dag_maker):
@task.virtualenv(python_version=3, use_dill=False, requirements=['dill'])
def f():
import sys
print(sys.version)
try:
{}.iteritems()
except AttributeError:
return
raise Exception
with dag_maker():
ret = f()
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
def test_with_args(self, dag_maker):
@task.virtualenv
def f(a, b, c=False, d=False):
if a == 0 and b == 1 and c and not d:
return True
else:
raise Exception
with dag_maker():
ret = f(0, 1, c=True)
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
def test_return_none(self, dag_maker):
@task.virtualenv
def f():
return None
with dag_maker():
ret = f()
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
def test_nonimported_as_arg(self, dag_maker):
@task.virtualenv
def f(_):
return None
with dag_maker():
ret = f(datetime.datetime.utcnow())
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)