blob: 1e7c29cca20dbfc4c1dbf1e53202d7f8819bcf9d [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import contextlib
import json
import logging
import os
import re
import shutil
import sys
from argparse import ArgumentParser
from contextlib import contextmanager, redirect_stdout
from io import StringIO
from pathlib import Path
from typing import TYPE_CHECKING
from unittest import mock
from unittest.mock import sentinel
import pendulum
import pytest
import sqlalchemy.exc
from airflow.cli import cli_parser
from airflow.cli.commands import task_command
from airflow.cli.commands.task_command import LoggerMutationHelper
from airflow.configuration import conf
from airflow.exceptions import AirflowException, DagRunNotFound
from airflow.models import DagBag, DagRun, Pool, TaskInstance
from airflow.models.serialized_dag import SerializedDagModel
from airflow.operators.bash import BashOperator
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import State, TaskInstanceState
from airflow.utils.types import DagRunType
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_pools, clear_db_runs
pytestmark = pytest.mark.db_test
if TYPE_CHECKING:
from airflow.models.dag import DAG
DEFAULT_DATE = timezone.datetime(2022, 1, 1)
ROOT_FOLDER = Path(__file__).parents[3].resolve()
def reset(dag_id):
with create_session() as session:
tis = session.query(TaskInstance).filter_by(dag_id=dag_id)
tis.delete()
runs = session.query(DagRun).filter_by(dag_id=dag_id)
runs.delete()
@contextmanager
def move_back(old_path, new_path):
shutil.move(old_path, new_path)
yield
shutil.move(new_path, old_path)
# TODO: Check if tests needs side effects - locally there's missing DAG
class TestCliTasks:
run_id = "TEST_RUN_ID"
dag_id = "example_python_operator"
parser: ArgumentParser
dagbag: DagBag
dag: DAG
dag_run: DagRun
@classmethod
def setup_class(cls):
cls.dagbag = DagBag(include_examples=True)
cls.parser = cli_parser.get_parser()
clear_db_runs()
cls.dag = cls.dagbag.get_dag(cls.dag_id)
cls.dagbag.sync_to_db()
data_interval = cls.dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE)
cls.dag_run = cls.dag.create_dagrun(
state=State.NONE,
run_id=cls.run_id,
run_type=DagRunType.MANUAL,
execution_date=DEFAULT_DATE,
data_interval=data_interval,
)
@classmethod
def teardown_class(cls) -> None:
clear_db_runs()
@pytest.mark.execution_timeout(120)
def test_cli_list_tasks(self):
for dag_id in self.dagbag.dags:
args = self.parser.parse_args(["tasks", "list", dag_id])
task_command.task_list(args)
args = self.parser.parse_args(["tasks", "list", "example_bash_operator", "--tree"])
task_command.task_list(args)
@pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning")
def test_test(self):
"""Test the `airflow test` command"""
args = self.parser.parse_args(
["tasks", "test", "example_python_operator", "print_the_context", "2018-01-01"]
)
with redirect_stdout(StringIO()) as stdout:
task_command.task_test(args)
# Check that prints, and log messages, are shown
assert "'example_python_operator__print_the_context__20180101'" in stdout.getvalue()
@pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning")
@mock.patch("airflow.utils.timezone.utcnow")
def test_test_no_execution_date(self, mock_utcnow):
"""Test the `airflow test` command"""
now = pendulum.now("UTC")
mock_utcnow.return_value = now
ds = now.strftime("%Y%m%d")
args = self.parser.parse_args(["tasks", "test", "example_python_operator", "print_the_context"])
with redirect_stdout(StringIO()) as stdout:
task_command.task_test(args)
# Check that prints, and log messages, are shown
assert f"'example_python_operator__print_the_context__{ds}'" in stdout.getvalue()
@pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning")
def test_test_with_existing_dag_run(self, caplog):
"""Test the `airflow test` command"""
task_id = "print_the_context"
args = self.parser.parse_args(["tasks", "test", self.dag_id, task_id, DEFAULT_DATE.isoformat()])
with caplog.at_level("INFO", logger="airflow.task"):
task_command.task_test(args)
assert (
f"Marking task as SUCCESS. dag_id={self.dag_id}, task_id={task_id}, run_id={self.run_id}, "
in caplog.text
)
@pytest.mark.enable_redact
@pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning")
def test_test_filters_secrets(self, capsys):
"""Test ``airflow test`` does not print secrets to stdout.
Output should be filtered by SecretsMasker.
"""
password = "somepassword1234!"
logging.getLogger("airflow.task").filters[0].add_mask(password)
args = self.parser.parse_args(
["tasks", "test", "example_python_operator", "print_the_context", "2018-01-01"],
)
with mock.patch("airflow.models.TaskInstance.run", side_effect=lambda *_, **__: print(password)):
task_command.task_test(args)
assert capsys.readouterr().out.endswith("***\n")
not_password = "!4321drowssapemos"
with mock.patch("airflow.models.TaskInstance.run", side_effect=lambda *_, **__: print(not_password)):
task_command.task_test(args)
assert capsys.readouterr().out.endswith(f"{not_password}\n")
def test_cli_test_different_path(self, session, tmp_path):
"""
When thedag processor has a different dags folder
from the worker, ``airflow tasks run --local`` should still work.
"""
repo_root = Path(__file__).parents[3]
orig_file_path = repo_root / "tests/dags/test_dags_folder.py"
orig_dags_folder = orig_file_path.parent
# parse dag in original path
with conf_vars({("core", "dags_folder"): orig_dags_folder.as_posix()}):
dagbag = DagBag(include_examples=False)
dag = dagbag.get_dag("test_dags_folder")
dagbag.sync_to_db(session=session)
execution_date = pendulum.now("UTC")
data_interval = dag.timetable.infer_manual_data_interval(run_after=execution_date)
dag.create_dagrun(
state=State.NONE,
run_id="abc123",
run_type=DagRunType.MANUAL,
execution_date=execution_date,
data_interval=data_interval,
session=session,
)
session.commit()
# now let's move the file
# additionally let's update the dags folder to be the new path
# ideally since dags_folder points correctly to the file, airflow
# should be able to find the dag.
new_file_path = tmp_path / orig_file_path.name
new_dags_folder = new_file_path.parent
with move_back(orig_file_path, new_file_path), conf_vars(
{("core", "dags_folder"): new_dags_folder.as_posix()}
):
ser_dag = (
session.query(SerializedDagModel)
.filter(SerializedDagModel.dag_id == "test_dags_folder")
.one()
)
# confirm that the serialized dag location has not been updated
assert ser_dag.fileloc == orig_file_path.as_posix()
assert ser_dag.data["dag"]["_processor_dags_folder"] == orig_dags_folder.as_posix()
assert ser_dag.data["dag"]["fileloc"] == orig_file_path.as_posix()
assert ser_dag.dag._processor_dags_folder == orig_dags_folder.as_posix()
from airflow.settings import DAGS_FOLDER
assert DAGS_FOLDER == new_dags_folder.as_posix() != orig_dags_folder.as_posix()
task_command.task_run(
self.parser.parse_args(
[
"tasks",
"run",
"--ignore-all-dependencies",
"--local",
"test_dags_folder",
"task",
"abc123",
]
)
)
ti = (
session.query(TaskInstance)
.filter(
TaskInstance.task_id == "task",
TaskInstance.dag_id == "test_dags_folder",
TaskInstance.run_id == "abc123",
TaskInstance.map_index == -1,
)
.one()
)
assert ti.state == "success"
# verify that the file was in different location when run
assert ti.xcom_pull(ti.task_id) == new_file_path.as_posix()
@mock.patch("airflow.cli.commands.task_command.LocalTaskJobRunner")
def test_run_with_existing_dag_run_id(self, mock_local_job_runner):
"""
Test that we can run with existing dag_run_id
"""
task0_id = self.dag.task_ids[0]
args0 = [
"tasks",
"run",
"--ignore-all-dependencies",
"--local",
self.dag_id,
task0_id,
self.run_id,
]
mock_local_job_runner.return_value.job_type = "LocalTaskJob"
task_command.task_run(self.parser.parse_args(args0), dag=self.dag)
mock_local_job_runner.assert_called_once_with(
job=mock.ANY,
task_instance=mock.ANY,
mark_success=False,
ignore_all_deps=True,
ignore_depends_on_past=False,
wait_for_past_depends_before_skipping=False,
ignore_task_deps=False,
ignore_ti_state=False,
pickle_id=None,
pool=None,
external_executor_id=None,
)
@pytest.mark.parametrize(
"from_db",
[True, False],
)
@mock.patch("airflow.cli.commands.task_command.LocalTaskJobRunner")
def test_run_with_read_from_db(self, mock_local_job_runner, caplog, from_db):
"""
Test that we can run with read from db
"""
task0_id = self.dag.task_ids[0]
args0 = [
"tasks",
"run",
"--ignore-all-dependencies",
"--local",
self.dag_id,
task0_id,
self.run_id,
] + (["--read-from-db"] if from_db else [])
mock_local_job_runner.return_value.job_type = "LocalTaskJob"
task_command.task_run(self.parser.parse_args(args0))
mock_local_job_runner.assert_called_once_with(
job=mock.ANY,
task_instance=mock.ANY,
mark_success=False,
ignore_all_deps=True,
ignore_depends_on_past=False,
wait_for_past_depends_before_skipping=False,
ignore_task_deps=False,
ignore_ti_state=False,
pickle_id=None,
pool=None,
external_executor_id=None,
)
assert ("Filling up the DagBag from" in caplog.text) != from_db
@mock.patch("airflow.cli.commands.task_command.LocalTaskJobRunner")
def test_run_raises_when_theres_no_dagrun(self, mock_local_job):
"""
Test that run raises when there's run_id but no dag_run
"""
dag_id = "test_run_ignores_all_dependencies"
dag = self.dagbag.get_dag(dag_id)
task0_id = "test_run_dependent_task"
run_id = "TEST_RUN_ID"
args0 = [
"tasks",
"run",
"--ignore-all-dependencies",
"--local",
dag_id,
task0_id,
run_id,
]
with pytest.raises(DagRunNotFound):
task_command.task_run(self.parser.parse_args(args0), dag=dag)
def test_cli_test_with_params(self):
task_command.task_test(
self.parser.parse_args(
[
"tasks",
"test",
"example_passing_params_via_test_command",
"run_this",
DEFAULT_DATE.isoformat(),
"--task-params",
'{"foo":"bar"}',
]
)
)
task_command.task_test(
self.parser.parse_args(
[
"tasks",
"test",
"example_passing_params_via_test_command",
"also_run_this",
DEFAULT_DATE.isoformat(),
"--task-params",
'{"foo":"bar"}',
]
)
)
def test_cli_test_with_env_vars(self):
with redirect_stdout(StringIO()) as stdout:
task_command.task_test(
self.parser.parse_args(
[
"tasks",
"test",
"example_passing_params_via_test_command",
"env_var_test_task",
DEFAULT_DATE.isoformat(),
"--env-vars",
'{"foo":"bar"}',
]
)
)
output = stdout.getvalue()
assert "foo=bar" in output
assert "AIRFLOW_TEST_MODE=True" in output
@mock.patch("airflow.triggers.file.os.path.getmtime", return_value=0)
@mock.patch("airflow.triggers.file.glob", return_value=["/tmp/test"])
@mock.patch("airflow.triggers.file.os.path.isfile", return_value=True)
@mock.patch("airflow.sensors.filesystem.FileSensor.poke", return_value=False)
def test_cli_test_with_deferrable_operator(self, mock_pock, mock_is_file, mock_glob, mock_getmtime):
with redirect_stdout(StringIO()) as stdout:
task_command.task_test(
self.parser.parse_args(
[
"tasks",
"test",
"example_sensors",
"wait_for_file_async",
DEFAULT_DATE.isoformat(),
]
)
)
output = stdout.getvalue()
assert "wait_for_file_async completed successfully as /tmp/temporary_file_for_testing found" in output
@pytest.mark.parametrize(
"option",
[
"--ignore-all-dependencies",
"--ignore-depends-on-past",
"--ignore-dependencies",
"--force",
],
)
def test_cli_run_invalid_raw_option(self, option: str):
with pytest.raises(
AirflowException,
match="Option --raw does not work with some of the other options on this command.",
):
task_command.task_run(
self.parser.parse_args(
[ # type: ignore
"tasks",
"run",
"example_bash_operator",
"runme_0",
DEFAULT_DATE.isoformat(),
"--raw",
option,
]
)
)
def test_cli_run_mutually_exclusive(self):
with pytest.raises(AirflowException, match="Option --raw and --local are mutually exclusive."):
task_command.task_run(
self.parser.parse_args(
[
"tasks",
"run",
"example_bash_operator",
"runme_0",
DEFAULT_DATE.isoformat(),
"--raw",
"--local",
]
)
)
def test_task_render(self):
"""
tasks render should render and displays templated fields for a given task
"""
with redirect_stdout(StringIO()) as stdout:
task_command.task_render(
self.parser.parse_args(["tasks", "render", "tutorial", "templated", "2016-01-01"])
)
output = stdout.getvalue()
assert 'echo "2016-01-01"' in output
assert 'echo "2016-01-08"' in output
def test_mapped_task_render(self):
"""
tasks render should render and displays templated fields for a given mapping task
"""
with redirect_stdout(StringIO()) as stdout:
task_command.task_render(
self.parser.parse_args(
[
"tasks",
"render",
"test_mapped_classic",
"consumer_literal",
"2022-01-01",
"--map-index",
"0",
]
)
)
# the dag test_mapped_classic has op_args=[[1], [2], [3]], so the first mapping task should have
# op_args=[1]
output = stdout.getvalue()
assert "[1]" in output
assert "[2]" not in output
assert "[3]" not in output
assert "property: op_args" in output
def test_mapped_task_render_with_template(self, dag_maker):
"""
tasks render should render and displays templated fields for a given mapping task
"""
with dag_maker() as dag:
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
commands = [templated_command, "echo 1"]
BashOperator.partial(task_id="some_command").expand(bash_command=commands)
with redirect_stdout(StringIO()) as stdout:
task_command.task_render(
self.parser.parse_args(
[
"tasks",
"render",
"test_dag",
"some_command",
"2022-01-01",
"--map-index",
"0",
]
),
dag=dag,
)
output = stdout.getvalue()
assert 'echo "2022-01-01"' in output
assert 'echo "2022-01-08"' in output
@mock.patch("airflow.cli.commands.task_command.select")
@mock.patch("sqlalchemy.orm.session.Session.scalars")
@mock.patch("airflow.cli.commands.task_command.DagRun")
def test_task_render_with_custom_timetable(self, mock_dagrun, mock_scalars, mock_select):
"""
when calling `tasks render` on dag with custom timetable, the DagRun object should be created with
data_intervals.
"""
mock_scalars.side_effect = sqlalchemy.exc.NoResultFound
task_command.task_render(
self.parser.parse_args(["tasks", "render", "example_workday_timetable", "run_this", "2022-01-01"])
)
assert "data_interval" in mock_dagrun.call_args.kwargs
def test_cli_run_when_pickle_and_dag_cli_method_selected(self):
"""
tasks run should return an AirflowException when invalid pickle_id is passed
"""
pickle_id = "pickle_id"
with pytest.raises(
AirflowException,
match=re.escape("You cannot use the --pickle option when using DAG.cli() method."),
):
task_command.task_run(
self.parser.parse_args(
[
"tasks",
"run",
"example_bash_operator",
"runme_0",
DEFAULT_DATE.isoformat(),
"--pickle",
pickle_id,
]
),
self.dag,
)
def test_task_state(self):
task_command.task_state(
self.parser.parse_args(
["tasks", "state", self.dag_id, "print_the_context", DEFAULT_DATE.isoformat()]
)
)
def test_task_states_for_dag_run(self):
dag2 = DagBag().dags["example_python_operator"]
task2 = dag2.get_task(task_id="print_the_context")
default_date2 = timezone.datetime(2016, 1, 9)
dag2.clear()
data_interval = dag2.timetable.infer_manual_data_interval(run_after=default_date2)
dagrun = dag2.create_dagrun(
state=State.RUNNING,
execution_date=default_date2,
data_interval=data_interval,
run_type=DagRunType.MANUAL,
external_trigger=True,
)
ti2 = TaskInstance(task2, run_id=dagrun.run_id)
ti2.set_state(State.SUCCESS)
ti_start = ti2.start_date
ti_end = ti2.end_date
with redirect_stdout(StringIO()) as stdout:
task_command.task_states_for_dag_run(
self.parser.parse_args(
[
"tasks",
"states-for-dag-run",
"example_python_operator",
default_date2.isoformat(),
"--output",
"json",
]
)
)
actual_out = json.loads(stdout.getvalue())
assert len(actual_out) == 1
assert actual_out[0] == {
"dag_id": "example_python_operator",
"execution_date": "2016-01-09T00:00:00+00:00",
"task_id": "print_the_context",
"state": "success",
"start_date": ti_start.isoformat(),
"end_date": ti_end.isoformat(),
}
def test_task_states_for_dag_run_when_dag_run_not_exists(self):
"""
task_states_for_dag_run should return an AirflowException when invalid dag id is passed
"""
with pytest.raises(DagRunNotFound):
task_command.task_states_for_dag_run(
self.parser.parse_args(
[
"tasks",
"states-for-dag-run",
"not_exists_dag",
timezone.datetime(2016, 1, 9).isoformat(),
"--output",
"json",
]
)
)
def test_subdag_clear(self):
args = self.parser.parse_args(["tasks", "clear", "example_subdag_operator", "--yes"])
task_command.task_clear(args)
args = self.parser.parse_args(
["tasks", "clear", "example_subdag_operator", "--yes", "--exclude-subdags"]
)
task_command.task_clear(args)
def test_parentdag_downstream_clear(self):
args = self.parser.parse_args(["tasks", "clear", "example_subdag_operator.section-1", "--yes"])
task_command.task_clear(args)
args = self.parser.parse_args(
["tasks", "clear", "example_subdag_operator.section-1", "--yes", "--exclude-parentdag"]
)
task_command.task_clear(args)
def _set_state_and_try_num(ti, session):
ti.state = TaskInstanceState.QUEUED
ti.try_number += 1
session.commit()
class TestLogsfromTaskRunCommand:
def setup_method(self) -> None:
self.dag_id = "test_logging_dag"
self.task_id = "test_task"
self.run_id = "test_run"
self.dag_path = os.path.join(ROOT_FOLDER, "dags", "test_logging_in_dag.py")
reset(self.dag_id)
self.execution_date = timezone.datetime(2017, 1, 1)
self.execution_date_str = self.execution_date.isoformat()
self.task_args = ["tasks", "run", self.dag_id, self.task_id, "--local", self.execution_date_str]
self.log_dir = conf.get_mandatory_value("logging", "base_log_folder")
self.log_filename = f"dag_id={self.dag_id}/run_id={self.run_id}/task_id={self.task_id}/attempt=1.log"
self.ti_log_file_path = os.path.join(self.log_dir, self.log_filename)
self.parser = cli_parser.get_parser()
dag = DagBag().get_dag(self.dag_id)
data_interval = dag.timetable.infer_manual_data_interval(run_after=self.execution_date)
self.dr = dag.create_dagrun(
run_id=self.run_id,
execution_date=self.execution_date,
data_interval=data_interval,
start_date=timezone.utcnow(),
state=State.RUNNING,
run_type=DagRunType.MANUAL,
)
self.tis = self.dr.get_task_instances()
assert len(self.tis) == 1
self.ti = self.tis[0]
root = self.root_logger = logging.getLogger()
self.root_handlers = root.handlers.copy()
self.root_filters = root.filters.copy()
self.root_level = root.level
with contextlib.suppress(OSError):
os.remove(self.ti_log_file_path)
def teardown_method(self) -> None:
root = self.root_logger
root.setLevel(self.root_level)
root.handlers[:] = self.root_handlers
root.filters[:] = self.root_filters
reset(self.dag_id)
with contextlib.suppress(OSError):
os.remove(self.ti_log_file_path)
def assert_log_line(self, text, logs_list, expect_from_logging_mixin=False):
"""
Get Log Line and assert only 1 Entry exists with the given text. Also check that
"logging_mixin" line does not appear in that log line to avoid duplicate logging as below:
[2020-06-24 16:47:23,537] {logging_mixin.py:91} INFO - [2020-06-24 16:47:23,536] {python.py:135}
"""
log_lines = [log for log in logs_list if text in log]
assert len(log_lines) == 1
log_line = log_lines[0]
if not expect_from_logging_mixin:
# Logs from print statement still show with logging_mixing as filename
# Example: [2020-06-24 17:07:00,482] {logging_mixin.py:91} INFO - Log from Print statement
assert "logging_mixin.py" not in log_line
return log_line
@mock.patch("airflow.cli.commands.task_command.LocalTaskJobRunner")
def test_external_executor_id_present_for_fork_run_task(self, mock_local_job):
mock_local_job.return_value.job_type = "LocalTaskJob"
args = self.parser.parse_args(self.task_args)
args.external_executor_id = "ABCD12345"
task_command.task_run(args)
mock_local_job.assert_called_once_with(
job=mock.ANY,
task_instance=mock.ANY,
mark_success=False,
pickle_id=None,
ignore_all_deps=False,
ignore_depends_on_past=False,
wait_for_past_depends_before_skipping=False,
ignore_task_deps=False,
ignore_ti_state=False,
pool=None,
external_executor_id="ABCD12345",
)
@mock.patch("airflow.cli.commands.task_command.LocalTaskJobRunner")
def test_external_executor_id_present_for_process_run_task(self, mock_local_job):
mock_local_job.return_value.job_type = "LocalTaskJob"
args = self.parser.parse_args(self.task_args)
args.external_executor_id = "ABCD12345"
with mock.patch.dict(os.environ, {"external_executor_id": "12345FEDCBA"}):
task_command.task_run(args)
mock_local_job.assert_called_once_with(
job=mock.ANY,
task_instance=mock.ANY,
mark_success=False,
pickle_id=None,
ignore_all_deps=False,
ignore_depends_on_past=False,
wait_for_past_depends_before_skipping=False,
ignore_task_deps=False,
ignore_ti_state=False,
pool=None,
external_executor_id="ABCD12345",
)
@pytest.mark.parametrize(
"is_k8s, is_container_exec", [("true", "true"), ("true", ""), ("", "true"), ("", "")]
)
def test_logging_with_run_task_stdout_k8s_executor_pod(self, is_k8s, is_container_exec, session):
"""
When running task --local as k8s executor pod, all logging should make it to stdout.
Otherwise, all logging after "running TI" is redirected to logs (and the actual log
file content is tested elsewhere in this module).
Unfortunately, to test stdout, we have to test this by running as a subprocess because
the stdout redirection & log capturing behavior is not compatible with pytest's stdout
capturing behavior. Running as subprocess takes pytest out of the equation and
verifies with certainty the behavior.
"""
import subprocess
ti = self.dr.get_task_instances(session=session)[0]
_set_state_and_try_num(ti, session) # so that try_number is correct
with mock.patch.dict(
"os.environ",
AIRFLOW_IS_K8S_EXECUTOR_POD=is_k8s,
AIRFLOW_IS_EXECUTOR_CONTAINER=is_container_exec,
PYTHONPATH=os.fspath(ROOT_FOLDER),
):
with subprocess.Popen(
args=[sys.executable, "-m", "airflow", *self.task_args, "-S", self.dag_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as process:
output, err = process.communicate()
if err:
print(err.decode("utf-8"))
lines = []
found_start = False
for line_ in output.splitlines():
print(line_.decode("utf-8"))
line = line_.decode("utf-8")
if "Running <TaskInstance: test_logging_dag.test_task test_run" in line:
found_start = True
if found_start:
lines.append(line)
if any((is_k8s, is_container_exec)):
# 10 is arbitrary, but, with enough padding to hopefully not be flakey
assert len(lines) > 10
self.assert_log_line("Starting attempt 1 of 1", lines)
self.assert_log_line("Exporting env vars", lines)
self.assert_log_line("Log from DAG Logger", lines)
self.assert_log_line("Log from TI Logger", lines)
self.assert_log_line("Log from Print statement", lines, expect_from_logging_mixin=True)
self.assert_log_line("Task exited with return code 0", lines)
else:
# when not k8s executor pod, most output is redirected to logs
assert len(lines) == 1
@pytest.mark.skipif(not hasattr(os, "fork"), reason="Forking not available")
def test_logging_with_run_task(self, session):
ti = self.dr.get_task_instances(session=session)[0]
_set_state_and_try_num(ti, session)
with conf_vars({("core", "dags_folder"): self.dag_path}):
task_command.task_run(self.parser.parse_args(self.task_args))
with open(self.ti_log_file_path) as l_file:
logs = l_file.read()
print(logs) # In case of a test failures this line would show detailed log
logs_list = logs.splitlines()
assert "INFO - Started process" in logs
assert f"Subtask {self.task_id}" in logs
assert "standard_task_runner.py" in logs
assert (
f"INFO - Running: ['airflow', 'tasks', 'run', '{self.dag_id}', "
f"'{self.task_id}', '{self.run_id}'," in logs
)
self.assert_log_line("Log from DAG Logger", logs_list)
self.assert_log_line("Log from TI Logger", logs_list)
self.assert_log_line("Log from Print statement", logs_list, expect_from_logging_mixin=True)
assert (
f"INFO - Marking task as SUCCESS. dag_id={self.dag_id}, "
f"task_id={self.task_id}, run_id={self.run_id}, execution_date=20170101T000000" in logs
)
@pytest.mark.skipif(not hasattr(os, "fork"), reason="Forking not available")
def test_run_task_with_pool(self):
pool_name = "test_pool_run"
clear_db_pools()
with create_session() as session:
pool = Pool(pool=pool_name, slots=1, include_deferred=False)
session.add(pool)
session.commit()
assert session.query(TaskInstance).filter_by(pool=pool_name).first() is None
task_command.task_run(self.parser.parse_args([*self.task_args, "--pool", pool_name]))
assert session.query(TaskInstance).filter_by(pool=pool_name).first() is not None
session.delete(pool)
session.commit()
@mock.patch("airflow.task.task_runner.standard_task_runner.CAN_FORK", False)
def test_logging_with_run_task_subprocess(self, session):
ti = self.dr.get_task_instances(session=session)[0]
_set_state_and_try_num(ti, session)
with conf_vars({("core", "dags_folder"): self.dag_path}):
task_command.task_run(self.parser.parse_args(self.task_args))
with open(self.ti_log_file_path) as l_file:
logs = l_file.read()
print(logs) # In case of a test failures this line would show detailed log
logs_list = logs.splitlines()
assert f"Subtask {self.task_id}" in logs
assert "base_task_runner.py" in logs
self.assert_log_line("Log from DAG Logger", logs_list)
self.assert_log_line("Log from TI Logger", logs_list)
self.assert_log_line("Log from Print statement", logs_list, expect_from_logging_mixin=True)
assert f"INFO - Running: ['airflow', 'tasks', 'run', '{self.dag_id}', '{self.task_id}'," in logs
assert (
f"INFO - Marking task as SUCCESS. dag_id={self.dag_id}, "
f"task_id={self.task_id}, run_id={self.run_id}, execution_date=20170101T000000" in logs
)
def test_log_file_template_with_run_task(self, session):
"""Verify that the taskinstance has the right context for log_filename_template"""
with conf_vars({("core", "dags_folder"): self.dag_path}):
# increment the try_number of the task to be run
with create_session() as session:
ti = session.query(TaskInstance).filter_by(run_id=self.run_id).first()
ti.try_number = 2
log_file_path = os.path.join(os.path.dirname(self.ti_log_file_path), "attempt=2.log")
try:
task_command.task_run(self.parser.parse_args(self.task_args))
assert os.path.exists(log_file_path)
finally:
with contextlib.suppress(OSError):
os.remove(log_file_path)
@mock.patch.object(task_command, "_run_task_by_selected_method")
def test_root_logger_restored(self, run_task_mock, caplog):
"""Verify that the root logging context is restored"""
logger = logging.getLogger("foo.bar")
def task_inner(*args, **kwargs):
logger.warning("redirected log message")
run_task_mock.side_effect = task_inner
config = {
("core", "dags_folder"): self.dag_path,
("logging", "logging_level"): "INFO",
}
with caplog.at_level(level=logging.WARNING):
with conf_vars(config):
logger.warning("not redirected")
task_command.task_run(self.parser.parse_args(self.task_args))
assert "not redirected" in caplog.text
assert self.root_logger.level == logging.WARNING
assert self.root_logger.handlers == self.root_handlers
@mock.patch.object(task_command, "_run_task_by_selected_method")
@pytest.mark.parametrize("do_not_modify_handler", [True, False])
def test_disable_handler_modifying(self, run_task_mock, caplog, do_not_modify_handler):
"""If [core] donot_modify_handlers is set to True, the root logger is untouched"""
from airflow import settings
logger = logging.getLogger("foo.bar")
def task_inner(*args, **kwargs):
logger.warning("not redirected")
run_task_mock.side_effect = task_inner
config = {
("core", "dags_folder"): self.dag_path,
("logging", "logging_level"): "INFO",
}
with caplog.at_level(logging.WARNING, logger="foo.bar"):
with conf_vars(config):
old_value = settings.DONOT_MODIFY_HANDLERS
settings.DONOT_MODIFY_HANDLERS = do_not_modify_handler
try:
task_command.task_run(self.parser.parse_args(self.task_args))
if do_not_modify_handler:
assert "not redirected" in caplog.text
else:
assert "not redirected" not in caplog.text
finally:
settings.DONOT_MODIFY_HANDLERS = old_value
def test_context_with_run():
dag_id = "test_parsing_context"
task_id = "task1"
run_id = "test_run"
dag_path = os.path.join(ROOT_FOLDER, "dags", "test_parsing_context.py")
reset(dag_id)
execution_date = timezone.datetime(2017, 1, 1)
execution_date_str = execution_date.isoformat()
task_args = ["tasks", "run", dag_id, task_id, "--local", execution_date_str]
parser = cli_parser.get_parser()
dag = DagBag().get_dag(dag_id)
data_interval = dag.timetable.infer_manual_data_interval(run_after=execution_date)
dag.create_dagrun(
run_id=run_id,
execution_date=execution_date,
data_interval=data_interval,
start_date=timezone.utcnow(),
state=State.RUNNING,
run_type=DagRunType.MANUAL,
)
with conf_vars({("core", "dags_folder"): dag_path}):
task_command.task_run(parser.parse_args(task_args))
context_file = Path("/tmp/airflow_parsing_context")
text = context_file.read_text()
assert (
text == "_AIRFLOW_PARSING_CONTEXT_DAG_ID=test_parsing_context\n"
"_AIRFLOW_PARSING_CONTEXT_TASK_ID=task1\n"
)
class TestLoggerMutationHelper:
@pytest.mark.parametrize("target_name", ["test_apply_target", None])
def test_apply(self, target_name):
"""
Handlers, level and propagate should be applied on target.
"""
src = logging.getLogger(f"test_apply_source_{target_name}")
src.propagate = False
src.addHandler(sentinel.handler)
src.setLevel(-1)
obj = LoggerMutationHelper(src)
tgt = logging.getLogger("test_apply_target")
obj.apply(tgt)
assert tgt.handlers == [sentinel.handler]
assert tgt.propagate is False if target_name else True # root propagate unchanged
assert tgt.level == -1
def test_apply_no_replace(self):
"""
Handlers, level and propagate should be applied on target.
"""
src = logging.getLogger("test_apply_source_no_repl")
tgt = logging.getLogger("test_apply_target_no_repl")
h1 = logging.Handler()
h1.name = "h1"
h2 = logging.Handler()
h2.name = "h2"
h3 = logging.Handler()
h3.name = "h3"
src.handlers[:] = [h1, h2]
tgt.handlers[:] = [h2, h3]
LoggerMutationHelper(src).apply(tgt, replace=False)
assert tgt.handlers == [h2, h3, h1]
def test_move(self):
"""Move should apply plus remove source handler, set propagate to True"""
src = logging.getLogger("test_move_source")
src.propagate = False
src.addHandler(sentinel.handler)
src.setLevel(-1)
obj = LoggerMutationHelper(src)
tgt = logging.getLogger("test_apply_target")
obj.move(tgt)
assert tgt.handlers == [sentinel.handler]
assert tgt.propagate is False
assert tgt.level == -1
assert src.propagate is True
assert obj.propagate is False
assert src.level == obj.level
assert src.handlers == []
assert obj.handlers == tgt.handlers
def test_reset(self):
src = logging.getLogger("test_move_reset")
src.propagate = True
src.addHandler(sentinel.h1)
src.setLevel(-1)
obj = LoggerMutationHelper(src)
src.propagate = False
src.addHandler(sentinel.h2)
src.setLevel(-2)
obj.reset()
assert src.propagate is True
assert src.handlers == [sentinel.h1]
assert src.level == -1