blob: 4f7eb9471c57f24304adb9ef8cb6a07b49ef541d [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import logging
import logging.config
import os
import re
from importlib import reload
from pathlib import Path
from unittest import mock
from unittest.mock import patch
import pendulum
import pytest
from kubernetes.client import models as k8s
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.executors import executor_loader
from airflow.jobs.job import Job
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
from airflow.models.dag import DAG
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.models.trigger import Trigger
from airflow.operators.python import PythonOperator
from airflow.utils.log.file_task_handler import (
FileTaskHandler,
LogType,
_interleave_logs,
_parse_timestamps_in_log_file,
)
from airflow.utils.log.logging_mixin import set_context
from airflow.utils.net import get_hostname
from airflow.utils.session import create_session
from airflow.utils.state import State, TaskInstanceState
from airflow.utils.timezone import datetime
from airflow.utils.types import DagRunType
from tests.test_utils.config import conf_vars
pytestmark = pytest.mark.db_test
DEFAULT_DATE = datetime(2016, 1, 1)
TASK_LOGGER = "airflow.task"
FILE_TASK_HANDLER = "task"
class TestFileTaskLogHandler:
def clean_up(self):
with create_session() as session:
session.query(DagRun).delete()
session.query(TaskInstance).delete()
def setup_method(self):
logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
logging.root.disabled = False
self.clean_up()
# We use file task handler by default.
def teardown_method(self):
self.clean_up()
def test_deprecated_filename_template(self):
with pytest.warns(
RemovedInAirflow3Warning,
match="Passing filename_template to a log handler is deprecated and has no effect",
):
FileTaskHandler("", filename_template="/foo/bar")
def test_default_task_logging_setup(self):
# file task handler is used by default.
logger = logging.getLogger(TASK_LOGGER)
handlers = logger.handlers
assert len(handlers) == 1
handler = handlers[0]
assert handler.name == FILE_TASK_HANDLER
def test_file_task_handler_when_ti_value_is_invalid(self):
def task_callable(ti):
ti.log.info("test")
dag = DAG("dag_for_testing_file_task_handler", start_date=DEFAULT_DATE)
dagrun = dag.create_dagrun(
run_type=DagRunType.MANUAL,
state=State.RUNNING,
execution_date=DEFAULT_DATE,
data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE),
)
task = PythonOperator(
task_id="task_for_testing_file_log_handler",
dag=dag,
python_callable=task_callable,
)
ti = TaskInstance(task=task, run_id=dagrun.run_id)
logger = ti.log
ti.log.disabled = False
file_handler = next(
(handler for handler in logger.handlers if handler.name == FILE_TASK_HANDLER), None
)
assert file_handler is not None
set_context(logger, ti)
assert file_handler.handler is not None
# We expect set_context generates a file locally.
log_filename = file_handler.handler.baseFilename
assert os.path.isfile(log_filename)
assert log_filename.endswith("0.log"), log_filename
ti.run(ignore_ti_state=True)
file_handler.flush()
file_handler.close()
assert hasattr(file_handler, "read")
# Return value of read must be a tuple of list and list.
# passing invalid `try_number` to read function
logs, metadatas = file_handler.read(ti, 0)
assert isinstance(logs, list)
assert isinstance(metadatas, list)
assert len(logs) == 1
assert len(logs) == len(metadatas)
assert isinstance(metadatas[0], dict)
assert logs[0][0][0] == "default_host"
assert logs[0][0][1] == "Error fetching the logs. Try number 0 is invalid."
# Remove the generated tmp log file.
os.remove(log_filename)
def test_file_task_handler(self):
def task_callable(ti):
ti.log.info("test")
dag = DAG("dag_for_testing_file_task_handler", start_date=DEFAULT_DATE)
dagrun = dag.create_dagrun(
run_type=DagRunType.MANUAL,
state=State.RUNNING,
execution_date=DEFAULT_DATE,
data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE),
)
task = PythonOperator(
task_id="task_for_testing_file_log_handler",
dag=dag,
python_callable=task_callable,
)
ti = TaskInstance(task=task, run_id=dagrun.run_id)
ti.try_number += 1
logger = ti.log
ti.log.disabled = False
file_handler = next(
(handler for handler in logger.handlers if handler.name == FILE_TASK_HANDLER), None
)
assert file_handler is not None
set_context(logger, ti)
assert file_handler.handler is not None
# We expect set_context generates a file locally.
log_filename = file_handler.handler.baseFilename
assert os.path.isfile(log_filename)
assert log_filename.endswith("1.log"), log_filename
ti.run(ignore_ti_state=True)
file_handler.flush()
file_handler.close()
assert hasattr(file_handler, "read")
# Return value of read must be a tuple of list and list.
logs, metadatas = file_handler.read(ti)
assert isinstance(logs, list)
assert isinstance(metadatas, list)
assert len(logs) == 1
assert len(logs) == len(metadatas)
assert isinstance(metadatas[0], dict)
target_re = r"\n\[[^\]]+\] {test_log_handlers.py:\d+} INFO - test\n"
# We should expect our log line from the callable above to appear in
# the logs we read back
assert re.search(target_re, logs[0][0][-1]), "Logs were " + str(logs)
# Remove the generated tmp log file.
os.remove(log_filename)
def test_file_task_handler_running(self):
def task_callable(ti):
ti.log.info("test")
dag = DAG("dag_for_testing_file_task_handler", start_date=DEFAULT_DATE)
task = PythonOperator(
task_id="task_for_testing_file_log_handler",
python_callable=task_callable,
dag=dag,
)
dagrun = dag.create_dagrun(
run_type=DagRunType.MANUAL,
state=State.RUNNING,
execution_date=DEFAULT_DATE,
data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE),
)
ti = TaskInstance(task=task, run_id=dagrun.run_id)
ti.try_number = 2
ti.state = State.RUNNING
logger = ti.log
ti.log.disabled = False
file_handler = next(
(handler for handler in logger.handlers if handler.name == FILE_TASK_HANDLER), None
)
assert file_handler is not None
set_context(logger, ti)
assert file_handler.handler is not None
# We expect set_context generates a file locally.
log_filename = file_handler.handler.baseFilename
assert os.path.isfile(log_filename)
assert log_filename.endswith("2.log"), log_filename
logger.info("Test")
# Return value of read must be a tuple of list and list.
logs, metadatas = file_handler.read(ti)
assert isinstance(logs, list)
# Logs for running tasks should show up too.
assert isinstance(logs, list)
assert isinstance(metadatas, list)
assert len(logs) == 2
assert len(logs) == len(metadatas)
assert isinstance(metadatas[0], dict)
# Remove the generated tmp log file.
os.remove(log_filename)
@patch("airflow.utils.log.file_task_handler.FileTaskHandler._read_from_local")
def test__read_when_local(self, mock_read_local, create_task_instance):
"""
Test if local log file exists, then values returned from _read_from_local should be incorporated
into returned log.
"""
path = Path(
"dag_id=dag_for_testing_local_log_read/run_id=scheduled__2016-01-01T00:00:00+00:00/task_id=task_for_testing_local_log_read/attempt=1.log"
)
mock_read_local.return_value = (["the messages"], ["the log"])
local_log_file_read = create_task_instance(
dag_id="dag_for_testing_local_log_read",
task_id="task_for_testing_local_log_read",
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
)
fth = FileTaskHandler("")
actual = fth._read(ti=local_log_file_read, try_number=1)
mock_read_local.assert_called_with(path)
assert actual == ("*** the messages\nthe log", {"end_of_log": True, "log_pos": 7})
def test__read_from_local(self, tmp_path):
"""Tests the behavior of method _read_from_local"""
path1 = tmp_path / "hello1.log"
path2 = tmp_path / "hello1.log.suffix.log"
path1.write_text("file1 content")
path2.write_text("file2 content")
fth = FileTaskHandler("")
assert fth._read_from_local(path1) == (
[
"Found local files:",
f" * {path1}",
f" * {path2}",
],
["file1 content", "file2 content"],
)
@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.get_task_log"
)
@pytest.mark.parametrize("state", [TaskInstanceState.RUNNING, TaskInstanceState.SUCCESS])
def test__read_for_k8s_executor(self, mock_k8s_get_task_log, create_task_instance, state):
"""Test for k8s executor, the log is read from get_task_log method"""
mock_k8s_get_task_log.return_value = ([], [])
executor_name = "KubernetesExecutor"
ti = create_task_instance(
dag_id="dag_for_testing_k8s_executor_log_read",
task_id="task_for_testing_k8s_executor_log_read",
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
)
ti.state = state
ti.triggerer_job = None
with conf_vars({("core", "executor"): executor_name}):
reload(executor_loader)
fth = FileTaskHandler("")
fth._read(ti=ti, try_number=2)
if state == TaskInstanceState.RUNNING:
mock_k8s_get_task_log.assert_called_once_with(ti, 2)
else:
mock_k8s_get_task_log.assert_not_called()
def test__read_for_celery_executor_fallbacks_to_worker(self, create_task_instance):
"""Test for executors which do not have `get_task_log` method, it fallbacks to reading
log from worker if and only if remote logs aren't found"""
executor_name = "CeleryExecutor"
ti = create_task_instance(
dag_id="dag_for_testing_celery_executor_log_read",
task_id="task_for_testing_celery_executor_log_read",
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
)
ti.state = TaskInstanceState.RUNNING
ti.try_number = 2
with conf_vars({("core", "executor"): executor_name}):
reload(executor_loader)
fth = FileTaskHandler("")
fth._read_from_logs_server = mock.Mock()
fth._read_from_logs_server.return_value = ["this message"], ["this\nlog\ncontent"]
actual = fth._read(ti=ti, try_number=2)
fth._read_from_logs_server.assert_called_once()
assert actual == ("*** this message\nthis\nlog\ncontent", {"end_of_log": False, "log_pos": 16})
# Previous try_number should return served logs when remote logs aren't implemented
fth._read_from_logs_server = mock.Mock()
fth._read_from_logs_server.return_value = ["served logs try_number=1"], ["this\nlog\ncontent"]
actual = fth._read(ti=ti, try_number=1)
fth._read_from_logs_server.assert_called_once()
assert actual == (
"*** served logs try_number=1\nthis\nlog\ncontent",
{"end_of_log": True, "log_pos": 16},
)
# When remote_logs is implemented, previous try_number is from remote logs without reaching worker server
fth._read_from_logs_server.reset_mock()
fth._read_remote_logs = mock.Mock()
fth._read_remote_logs.return_value = ["remote logs"], ["remote\nlog\ncontent"]
actual = fth._read(ti=ti, try_number=1)
fth._read_remote_logs.assert_called_once()
fth._read_from_logs_server.assert_not_called()
assert actual == ("*** remote logs\nremote\nlog\ncontent", {"end_of_log": True, "log_pos": 18})
@pytest.mark.parametrize(
"remote_logs, local_logs, served_logs_checked",
[
(True, True, False),
(True, False, False),
(False, True, False),
(False, False, True),
],
)
def test__read_served_logs_checked_when_done_and_no_local_or_remote_logs(
self, create_task_instance, remote_logs, local_logs, served_logs_checked
):
"""
Generally speaking when a task is done we should not read from logs server,
because we assume for log persistence that users will either set up shared
drive or enable remote logging. But if they don't do that, and therefore
we don't find remote or local logs, we'll check worker for served logs as
a fallback.
"""
executor_name = "CeleryExecutor"
ti = create_task_instance(
dag_id="dag_for_testing_celery_executor_log_read",
task_id="task_for_testing_celery_executor_log_read",
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
)
ti.state = TaskInstanceState.SUCCESS # we're testing scenario when task is done
with conf_vars({("core", "executor"): executor_name}):
reload(executor_loader)
fth = FileTaskHandler("")
if remote_logs:
fth._read_remote_logs = mock.Mock()
fth._read_remote_logs.return_value = ["found remote logs"], ["remote\nlog\ncontent"]
if local_logs:
fth._read_from_local = mock.Mock()
fth._read_from_local.return_value = ["found local logs"], ["local\nlog\ncontent"]
fth._read_from_logs_server = mock.Mock()
fth._read_from_logs_server.return_value = ["this message"], ["this\nlog\ncontent"]
actual = fth._read(ti=ti, try_number=1)
if served_logs_checked:
fth._read_from_logs_server.assert_called_once()
assert actual == ("*** this message\nthis\nlog\ncontent", {"end_of_log": True, "log_pos": 16})
else:
fth._read_from_logs_server.assert_not_called()
assert actual[0]
assert actual[1]
@pytest.mark.parametrize(
"pod_override, namespace_to_call",
[
pytest.param(k8s.V1Pod(metadata=k8s.V1ObjectMeta(namespace="namespace-A")), "namespace-A"),
pytest.param(k8s.V1Pod(metadata=k8s.V1ObjectMeta(namespace="namespace-B")), "namespace-B"),
pytest.param(k8s.V1Pod(), "default"),
pytest.param(None, "default"),
pytest.param(k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="pod-name-xxx")), "default"),
],
)
@patch.dict("os.environ", AIRFLOW__CORE__EXECUTOR="KubernetesExecutor")
@patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
def test_read_from_k8s_under_multi_namespace_mode(
self, mock_kube_client, pod_override, namespace_to_call
):
mock_read_log = mock_kube_client.return_value.read_namespaced_pod_log
mock_list_pod = mock_kube_client.return_value.list_namespaced_pod
def task_callable(ti):
ti.log.info("test")
with DAG("dag_for_testing_file_task_handler", start_date=DEFAULT_DATE) as dag:
task = PythonOperator(
task_id="task_for_testing_file_log_handler",
python_callable=task_callable,
executor_config={"pod_override": pod_override},
)
dagrun = dag.create_dagrun(
run_type=DagRunType.MANUAL,
state=State.RUNNING,
execution_date=DEFAULT_DATE,
data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE),
)
ti = TaskInstance(task=task, run_id=dagrun.run_id)
ti.try_number = 3
logger = ti.log
ti.log.disabled = False
file_handler = next((h for h in logger.handlers if h.name == FILE_TASK_HANDLER), None)
set_context(logger, ti)
ti.run(ignore_ti_state=True)
ti.state = TaskInstanceState.RUNNING
file_handler.read(ti, 2)
# first we find pod name
mock_list_pod.assert_called_once()
actual_kwargs = mock_list_pod.call_args.kwargs
assert actual_kwargs["namespace"] == namespace_to_call
actual_selector = actual_kwargs["label_selector"]
assert re.match(
(
"airflow_version=.+?,"
"dag_id=dag_for_testing_file_task_handler,"
"kubernetes_executor=True,"
"run_id=manual__2016-01-01T0000000000-2b88d1d57,"
"task_id=task_for_testing_file_log_handler,"
"try_number=2,"
"airflow-worker"
),
actual_selector,
)
# then we read log
mock_read_log.assert_called_once_with(
name=mock_list_pod.return_value.items[0].metadata.name,
namespace=namespace_to_call,
container="base",
follow=False,
tail_lines=100,
_preload_content=False,
)
def test_add_triggerer_suffix(self):
sample = "any/path/to/thing.txt"
assert FileTaskHandler.add_triggerer_suffix(sample) == sample + ".trigger"
assert FileTaskHandler.add_triggerer_suffix(sample, job_id=None) == sample + ".trigger"
assert FileTaskHandler.add_triggerer_suffix(sample, job_id=123) == sample + ".trigger.123.log"
assert FileTaskHandler.add_triggerer_suffix(sample, job_id="123") == sample + ".trigger.123.log"
@pytest.mark.parametrize("is_a_trigger", [True, False])
def test_set_context_trigger(self, create_dummy_dag, dag_maker, is_a_trigger, session, tmp_path):
create_dummy_dag(dag_id="test_fth", task_id="dummy")
(ti,) = dag_maker.create_dagrun(execution_date=pendulum.datetime(2023, 1, 1, tz="UTC")).task_instances
assert isinstance(ti, TaskInstance)
if is_a_trigger:
ti.is_trigger_log_context = True
job = Job()
t = Trigger("", {})
t.triggerer_job = job
session.add(t)
ti.triggerer = t
t.task_instance = ti
h = FileTaskHandler(base_log_folder=os.fspath(tmp_path))
h.set_context(ti)
expected = "dag_id=test_fth/run_id=test/task_id=dummy/attempt=0.log"
if is_a_trigger:
expected += f".trigger.{job.id}.log"
actual = h.handler.baseFilename
assert actual == os.fspath(tmp_path / expected)
class TestFilenameRendering:
def test_python_formatting(self, create_log_template, create_task_instance):
create_log_template("{dag_id}/{task_id}/{execution_date}/{try_number}.log")
filename_rendering_ti = create_task_instance(
dag_id="dag_for_testing_filename_rendering",
task_id="task_for_testing_filename_rendering",
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
)
expected_filename = (
f"dag_for_testing_filename_rendering/task_for_testing_filename_rendering/"
f"{DEFAULT_DATE.isoformat()}/42.log"
)
fth = FileTaskHandler("")
rendered_filename = fth._render_filename(filename_rendering_ti, 42)
assert expected_filename == rendered_filename
def test_jinja_rendering(self, create_log_template, create_task_instance):
create_log_template("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log")
filename_rendering_ti = create_task_instance(
dag_id="dag_for_testing_filename_rendering",
task_id="task_for_testing_filename_rendering",
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
)
expected_filename = (
f"dag_for_testing_filename_rendering/task_for_testing_filename_rendering/"
f"{DEFAULT_DATE.isoformat()}/42.log"
)
fth = FileTaskHandler("")
rendered_filename = fth._render_filename(filename_rendering_ti, 42)
assert expected_filename == rendered_filename
class TestLogUrl:
def test_log_retrieval_valid(self, create_task_instance):
log_url_ti = create_task_instance(
dag_id="dag_for_testing_filename_rendering",
task_id="task_for_testing_filename_rendering",
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
)
log_url_ti.hostname = "hostname"
actual = FileTaskHandler("")._get_log_retrieval_url(log_url_ti, "DYNAMIC_PATH")
assert actual == ("http://hostname:8793/log/DYNAMIC_PATH", "DYNAMIC_PATH")
def test_log_retrieval_valid_trigger(self, create_task_instance):
ti = create_task_instance(
dag_id="dag_for_testing_filename_rendering",
task_id="task_for_testing_filename_rendering",
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
)
ti.hostname = "hostname"
trigger = Trigger("", {})
job = Job(TriggererJobRunner.job_type)
job.id = 123
trigger.triggerer_job = job
ti.trigger = trigger
actual = FileTaskHandler("")._get_log_retrieval_url(ti, "DYNAMIC_PATH", log_type=LogType.TRIGGER)
hostname = get_hostname()
assert actual == (
f"http://{hostname}:8794/log/DYNAMIC_PATH.trigger.123.log",
"DYNAMIC_PATH.trigger.123.log",
)
log_sample = """[2022-11-16T00:05:54.278-0800] {taskinstance.py:1257} INFO -
--------------------------------------------------------------------------------
[2022-11-16T00:05:54.278-0800] {taskinstance.py:1258} INFO - Starting attempt 1 of 1
[2022-11-16T00:05:54.279-0800] {taskinstance.py:1259} INFO -
--------------------------------------------------------------------------------
[2022-11-16T00:05:54.295-0800] {taskinstance.py:1278} INFO - Executing <Task(TimeDeltaSensorAsync): wait> on 2022-11-16 08:05:52.324532+00:00
[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - Started process 52536 to run task
[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - Started process 52536 to run task
[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - Started process 52536 to run task
[2022-11-16T00:05:54.306-0800] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'simple_async_timedelta', 'wait', 'manual__2022-11-16T08:05:52.324532+00:00', '--job-id', '33648', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmp725r305n']
[2022-11-16T00:05:54.309-0800] {standard_task_runner.py:83} INFO - Job 33648: Subtask wait
[2022-11-16T00:05:54.457-0800] {task_command.py:376} INFO - Running <TaskInstance: simple_async_timedelta.wait manual__2022-11-16T08:05:52.324532+00:00 [running]> on host daniels-mbp-2.lan
[2022-11-16T00:05:54.592-0800] {taskinstance.py:1485} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=simple_async_timedelta
AIRFLOW_CTX_TASK_ID=wait
AIRFLOW_CTX_EXECUTION_DATE=2022-11-16T08:05:52.324532+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-11-16T08:05:52.324532+00:00
[2022-11-16T00:05:54.604-0800] {taskinstance.py:1360} INFO - Pausing task as DEFERRED. dag_id=simple_async_timedelta, task_id=wait, execution_date=20221116T080552, start_date=20221116T080554
"""
def test_parse_timestamps():
actual = []
for timestamp, _, _ in _parse_timestamps_in_log_file(log_sample.splitlines()):
actual.append(timestamp)
assert actual == [
pendulum.parse("2022-11-16T00:05:54.278000-08:00"),
pendulum.parse("2022-11-16T00:05:54.278000-08:00"),
pendulum.parse("2022-11-16T00:05:54.278000-08:00"),
pendulum.parse("2022-11-16T00:05:54.279000-08:00"),
pendulum.parse("2022-11-16T00:05:54.279000-08:00"),
pendulum.parse("2022-11-16T00:05:54.295000-08:00"),
pendulum.parse("2022-11-16T00:05:54.300000-08:00"),
pendulum.parse("2022-11-16T00:05:54.300000-08:00"), # duplicate
pendulum.parse("2022-11-16T00:05:54.300000-08:00"), # duplicate
pendulum.parse("2022-11-16T00:05:54.306000-08:00"),
pendulum.parse("2022-11-16T00:05:54.309000-08:00"),
pendulum.parse("2022-11-16T00:05:54.457000-08:00"),
pendulum.parse("2022-11-16T00:05:54.592000-08:00"),
pendulum.parse("2022-11-16T00:05:54.592000-08:00"),
pendulum.parse("2022-11-16T00:05:54.592000-08:00"),
pendulum.parse("2022-11-16T00:05:54.592000-08:00"),
pendulum.parse("2022-11-16T00:05:54.592000-08:00"),
pendulum.parse("2022-11-16T00:05:54.592000-08:00"),
pendulum.parse("2022-11-16T00:05:54.592000-08:00"),
pendulum.parse("2022-11-16T00:05:54.604000-08:00"),
]
def test_interleave_interleaves():
log_sample1 = "\n".join(
[
"[2022-11-16T00:05:54.278-0800] {taskinstance.py:1258} INFO - Starting attempt 1 of 1",
]
)
log_sample2 = "\n".join(
[
"[2022-11-16T00:05:54.295-0800] {taskinstance.py:1278} INFO - Executing <Task(TimeDeltaSensorAsync): wait> on 2022-11-16 08:05:52.324532+00:00",
"[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - Started process 52536 to run task",
"[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - Started process 52536 to run task",
"[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - Started process 52536 to run task",
"[2022-11-16T00:05:54.306-0800] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'simple_async_timedelta', 'wait', 'manual__2022-11-16T08:05:52.324532+00:00', '--job-id', '33648', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmp725r305n']",
"[2022-11-16T00:05:54.309-0800] {standard_task_runner.py:83} INFO - Job 33648: Subtask wait",
]
)
log_sample3 = "\n".join(
[
"[2022-11-16T00:05:54.457-0800] {task_command.py:376} INFO - Running <TaskInstance: simple_async_timedelta.wait manual__2022-11-16T08:05:52.324532+00:00 [running]> on host daniels-mbp-2.lan",
"[2022-11-16T00:05:54.592-0800] {taskinstance.py:1485} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER=airflow",
"AIRFLOW_CTX_DAG_ID=simple_async_timedelta",
"AIRFLOW_CTX_TASK_ID=wait",
"AIRFLOW_CTX_EXECUTION_DATE=2022-11-16T08:05:52.324532+00:00",
"AIRFLOW_CTX_TRY_NUMBER=1",
"AIRFLOW_CTX_DAG_RUN_ID=manual__2022-11-16T08:05:52.324532+00:00",
"[2022-11-16T00:05:54.604-0800] {taskinstance.py:1360} INFO - Pausing task as DEFERRED. dag_id=simple_async_timedelta, task_id=wait, execution_date=20221116T080552, start_date=20221116T080554",
]
)
expected = "\n".join(
[
"[2022-11-16T00:05:54.278-0800] {taskinstance.py:1258} INFO - Starting attempt 1 of 1",
"[2022-11-16T00:05:54.295-0800] {taskinstance.py:1278} INFO - Executing <Task(TimeDeltaSensorAsync): wait> on 2022-11-16 08:05:52.324532+00:00",
"[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - Started process 52536 to run task",
"[2022-11-16T00:05:54.306-0800] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'simple_async_timedelta', 'wait', 'manual__2022-11-16T08:05:52.324532+00:00', '--job-id', '33648', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmp725r305n']",
"[2022-11-16T00:05:54.309-0800] {standard_task_runner.py:83} INFO - Job 33648: Subtask wait",
"[2022-11-16T00:05:54.457-0800] {task_command.py:376} INFO - Running <TaskInstance: simple_async_timedelta.wait manual__2022-11-16T08:05:52.324532+00:00 [running]> on host daniels-mbp-2.lan",
"[2022-11-16T00:05:54.592-0800] {taskinstance.py:1485} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER=airflow",
"AIRFLOW_CTX_DAG_ID=simple_async_timedelta",
"AIRFLOW_CTX_TASK_ID=wait",
"AIRFLOW_CTX_EXECUTION_DATE=2022-11-16T08:05:52.324532+00:00",
"AIRFLOW_CTX_TRY_NUMBER=1",
"AIRFLOW_CTX_DAG_RUN_ID=manual__2022-11-16T08:05:52.324532+00:00",
"[2022-11-16T00:05:54.604-0800] {taskinstance.py:1360} INFO - Pausing task as DEFERRED. dag_id=simple_async_timedelta, task_id=wait, execution_date=20221116T080552, start_date=20221116T080554",
]
)
assert "\n".join(_interleave_logs(log_sample2, log_sample1, log_sample3)) == expected
long_sample = """
*** yoyoyoyo
[2023-01-15T22:36:46.474-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_time_delta_sensor_async.wait manual__2023-01-16T06:36:43.044492+00:00 [queued]>
[2023-01-15T22:36:46.482-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_time_delta_sensor_async.wait manual__2023-01-16T06:36:43.044492+00:00 [queued]>
[2023-01-15T22:36:46.483-0800] {taskinstance.py:1332} INFO - Starting attempt 1 of 1
[2023-01-15T22:36:46.516-0800] {taskinstance.py:1351} INFO - Executing <Task(TimeDeltaSensorAsync): wait> on 2023-01-16 06:36:43.044492+00:00
[2023-01-15T22:36:46.522-0800] {standard_task_runner.py:56} INFO - Started process 38807 to run task
[2023-01-15T22:36:46.530-0800] {standard_task_runner.py:83} INFO - Running: ['airflow', 'tasks', 'run', 'example_time_delta_sensor_async', 'wait', 'manual__2023-01-16T06:36:43.044492+00:00', '--job-id', '487', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmpiwyl54bn', '--no-shut-down-logging']
[2023-01-15T22:36:46.536-0800] {standard_task_runner.py:84} INFO - Job 487: Subtask wait
[2023-01-15T22:36:46.624-0800] {task_command.py:417} INFO - Running <TaskInstance: example_time_delta_sensor_async.wait manual__2023-01-16T06:36:43.044492+00:00 [running]> on host daniels-mbp-2.lan
[2023-01-15T22:36:46.918-0800] {taskinstance.py:1558} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='example_time_delta_sensor_async' AIRFLOW_CTX_TASK_ID='wait' AIRFLOW_CTX_EXECUTION_DATE='2023-01-16T06:36:43.044492+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-01-16T06:36:43.044492+00:00'
[2023-01-15T22:36:46.929-0800] {taskinstance.py:1433} INFO - Pausing task as DEFERRED. dag_id=example_time_delta_sensor_async, task_id=wait, execution_date=20230116T063643, start_date=20230116T063646
[2023-01-15T22:36:46.981-0800] {local_task_job.py:218} INFO - Task exited with return code 100 (task deferral)
[2023-01-15T22:36:46.474-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_time_delta_sensor_async.wait manual__2023-01-16T06:36:43.044492+00:00 [queued]>
[2023-01-15T22:36:46.482-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_time_delta_sensor_async.wait manual__2023-01-16T06:36:43.044492+00:00 [queued]>
[2023-01-15T22:36:46.483-0800] {taskinstance.py:1332} INFO - Starting attempt 1 of 1
[2023-01-15T22:36:46.516-0800] {taskinstance.py:1351} INFO - Executing <Task(TimeDeltaSensorAsync): wait> on 2023-01-16 06:36:43.044492+00:00
[2023-01-15T22:36:46.522-0800] {standard_task_runner.py:56} INFO - Started process 38807 to run task
[2023-01-15T22:36:46.530-0800] {standard_task_runner.py:83} INFO - Running: ['airflow', 'tasks', 'run', 'example_time_delta_sensor_async', 'wait', 'manual__2023-01-16T06:36:43.044492+00:00', '--job-id', '487', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmpiwyl54bn', '--no-shut-down-logging']
[2023-01-15T22:36:46.536-0800] {standard_task_runner.py:84} INFO - Job 487: Subtask wait
[2023-01-15T22:36:46.624-0800] {task_command.py:417} INFO - Running <TaskInstance: example_time_delta_sensor_async.wait manual__2023-01-16T06:36:43.044492+00:00 [running]> on host daniels-mbp-2.lan
[2023-01-15T22:36:46.918-0800] {taskinstance.py:1558} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='example_time_delta_sensor_async' AIRFLOW_CTX_TASK_ID='wait' AIRFLOW_CTX_EXECUTION_DATE='2023-01-16T06:36:43.044492+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-01-16T06:36:43.044492+00:00'
[2023-01-15T22:36:46.929-0800] {taskinstance.py:1433} INFO - Pausing task as DEFERRED. dag_id=example_time_delta_sensor_async, task_id=wait, execution_date=20230116T063643, start_date=20230116T063646
[2023-01-15T22:36:46.981-0800] {local_task_job.py:218} INFO - Task exited with return code 100 (task deferral)
[2023-01-15T22:37:17.673-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_time_delta_sensor_async.wait manual__2023-01-16T06:36:43.044492+00:00 [queued]>
[2023-01-15T22:37:17.681-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_time_delta_sensor_async.wait manual__2023-01-16T06:36:43.044492+00:00 [queued]>
[2023-01-15T22:37:17.682-0800] {taskinstance.py:1330} INFO - resuming after deferral
[2023-01-15T22:37:17.693-0800] {taskinstance.py:1351} INFO - Executing <Task(TimeDeltaSensorAsync): wait> on 2023-01-16 06:36:43.044492+00:00
[2023-01-15T22:37:17.697-0800] {standard_task_runner.py:56} INFO - Started process 39090 to run task
[2023-01-15T22:37:17.703-0800] {standard_task_runner.py:83} INFO - Running: ['airflow', 'tasks', 'run', 'example_time_delta_sensor_async', 'wait', 'manual__2023-01-16T06:36:43.044492+00:00', '--job-id', '488', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmp_sa9sau4', '--no-shut-down-logging']
[2023-01-15T22:37:17.707-0800] {standard_task_runner.py:84} INFO - Job 488: Subtask wait
[2023-01-15T22:37:17.771-0800] {task_command.py:417} INFO - Running <TaskInstance: example_time_delta_sensor_async.wait manual__2023-01-16T06:36:43.044492+00:00 [running]> on host daniels-mbp-2.lan
[2023-01-15T22:37:18.043-0800] {taskinstance.py:1369} INFO - Marking task as SUCCESS. dag_id=example_time_delta_sensor_async, task_id=wait, execution_date=20230116T063643, start_date=20230116T063646, end_date=20230116T063718
[2023-01-15T22:37:18.117-0800] {local_task_job.py:220} INFO - Task exited with return code 0
[2023-01-15T22:37:18.147-0800] {taskinstance.py:2648} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2023-01-15T22:37:18.173-0800] {:0} Level None - end_of_log
*** hihihi!
[2023-01-15T22:36:48.348-0800] {temporal.py:62} INFO - trigger starting
[2023-01-15T22:36:48.348-0800] {temporal.py:66} INFO - 24 seconds remaining; sleeping 10 seconds
[2023-01-15T22:36:58.349-0800] {temporal.py:71} INFO - sleeping 1 second...
[2023-01-15T22:36:59.349-0800] {temporal.py:71} INFO - sleeping 1 second...
[2023-01-15T22:37:00.349-0800] {temporal.py:71} INFO - sleeping 1 second...
[2023-01-15T22:37:01.350-0800] {temporal.py:71} INFO - sleeping 1 second...
[2023-01-15T22:37:02.350-0800] {temporal.py:71} INFO - sleeping 1 second...
[2023-01-15T22:37:03.351-0800] {temporal.py:71} INFO - sleeping 1 second...
[2023-01-15T22:37:04.351-0800] {temporal.py:71} INFO - sleeping 1 second...
[2023-01-15T22:37:05.353-0800] {temporal.py:71} INFO - sleeping 1 second...
[2023-01-15T22:37:06.354-0800] {temporal.py:71} INFO - sleeping 1 second...
[2023-01-15T22:37:07.355-0800] {temporal.py:71} INFO - sleeping 1 second...
[2023-01-15T22:37:08.356-0800] {temporal.py:71} INFO - sleeping 1 second...
[2023-01-15T22:37:09.357-0800] {temporal.py:71} INFO - sleeping 1 second...
[2023-01-15T22:37:10.358-0800] {temporal.py:71} INFO - sleeping 1 second...
[2023-01-15T22:37:11.359-0800] {temporal.py:71} INFO - sleeping 1 second...
[2023-01-15T22:37:12.359-0800] {temporal.py:71} INFO - sleeping 1 second...
[2023-01-15T22:37:13.360-0800] {temporal.py:74} INFO - yielding event with payload DateTime(2023, 1, 16, 6, 37, 13, 44492, tzinfo=Timezone('UTC'))
[2023-01-15T22:37:13.361-0800] {triggerer_job.py:540} INFO - Trigger <airflow.triggers.temporal.DateTimeTrigger moment=2023-01-16T06:37:13.044492+00:00> (ID 106) fired: TriggerEvent<DateTime(2023, 1, 16, 6, 37, 13, 44492, tzinfo=Timezone('UTC'))>
"""
def test_interleave_logs_correct_ordering():
"""
Notice there are two messages with timestamp `2023-01-17T12:47:11.883-0800`.
In this case, these should appear in correct order and be deduped in result.
"""
sample_with_dupe = """[2023-01-17T12:46:55.868-0800] {temporal.py:62} INFO - trigger starting
[2023-01-17T12:46:55.868-0800] {temporal.py:71} INFO - sleeping 1 second...
[2023-01-17T12:47:09.882-0800] {temporal.py:71} INFO - sleeping 1 second...
[2023-01-17T12:47:10.882-0800] {temporal.py:71} INFO - sleeping 1 second...
[2023-01-17T12:47:11.883-0800] {temporal.py:74} INFO - yielding event with payload DateTime(2023, 1, 17, 20, 47, 11, 254388, tzinfo=Timezone('UTC'))
[2023-01-17T12:47:11.883-0800] {triggerer_job.py:540} INFO - Trigger <airflow.triggers.temporal.DateTimeTrigger moment=2023-01-17T20:47:11.254388+00:00> (ID 1) fired: TriggerEvent<DateTime(2023, 1, 17, 20, 47, 11, 254388, tzinfo=Timezone('UTC'))>
"""
assert sample_with_dupe == "\n".join(_interleave_logs(sample_with_dupe, "", sample_with_dupe))
def test_permissions_for_new_directories(tmp_path):
# Set umask to 0o027: owner rwx, group rx-w, other -rwx
old_umask = os.umask(0o027)
try:
base_dir = tmp_path / "base"
base_dir.mkdir()
log_dir = base_dir / "subdir1" / "subdir2"
# force permissions for the new folder to be owner rwx, group -rxw, other -rwx
new_folder_permissions = 0o700
# default permissions are owner rwx, group rx-w, other -rwx (umask bit negative)
default_permissions = 0o750
FileTaskHandler._prepare_log_folder(log_dir, new_folder_permissions)
assert log_dir.exists()
assert log_dir.is_dir()
assert log_dir.stat().st_mode % 0o1000 == new_folder_permissions
assert log_dir.parent.stat().st_mode % 0o1000 == new_folder_permissions
assert base_dir.stat().st_mode % 0o1000 == default_permissions
finally:
os.umask(old_umask)