blob: 31c663f30e904c785243b339692a772485bae8d3 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import itertools
import logging
import logging.config
import os
import re
from collections.abc import Iterable
from http import HTTPStatus
from importlib import reload
from pathlib import Path
from unittest import mock
from unittest.mock import patch
import pendulum
import pendulum.tz
import pytest
from pydantic import TypeAdapter
from pydantic.v1.utils import deep_update
from requests.adapters import Response
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from airflow.executors import executor_constants, executor_loader
from airflow.jobs.job import Job
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.models.trigger import Trigger
from airflow.providers.standard.operators.python import PythonOperator
from airflow.utils.log.file_task_handler import (
FileTaskHandler,
LogType,
StructuredLogMessage,
_fetch_logs_from_service,
_interleave_logs,
_parse_log_lines,
)
from airflow.utils.log.logging_mixin import set_context
from airflow.utils.net import get_hostname
from airflow.utils.session import create_session
from airflow.utils.state import State, TaskInstanceState
from airflow.utils.timezone import datetime
from airflow.utils.types import DagRunType
from tests_common.test_utils.config import conf_vars
pytestmark = pytest.mark.db_test
DEFAULT_DATE = datetime(2016, 1, 1)
TASK_LOGGER = "airflow.task"
FILE_TASK_HANDLER = "task"
def events(logs: Iterable[StructuredLogMessage], skip_source_info=True) -> list[str]:
"""Helper function to return just the event (a.k.a message) from a list of StructuredLogMessage"""
logs = iter(logs)
if skip_source_info:
def is_source_group(log: StructuredLogMessage):
return not hasattr(log, "timestamp") or log.event == "::endgroup"
logs = itertools.dropwhile(is_source_group, logs)
return [s.event for s in logs]
class TestFileTaskLogHandler:
def clean_up(self):
with create_session() as session:
session.query(DagRun).delete()
session.query(TaskInstance).delete()
def setup_method(self):
logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
logging.root.disabled = False
self.clean_up()
# We use file task handler by default.
def teardown_method(self):
self.clean_up()
def test_default_task_logging_setup(self):
# file task handler is used by default.
logger = logging.getLogger(TASK_LOGGER)
handlers = logger.handlers
assert len(handlers) == 1
handler = handlers[0]
assert handler.name == FILE_TASK_HANDLER
def test_file_task_handler_when_ti_value_is_invalid(self, dag_maker):
def task_callable(ti):
ti.log.info("test")
with dag_maker("dag_for_testing_file_task_handler", schedule=None):
task = PythonOperator(
task_id="task_for_testing_file_log_handler",
python_callable=task_callable,
)
dagrun = dag_maker.create_dagrun()
ti = TaskInstance(task=task, run_id=dagrun.run_id)
logger = ti.log
ti.log.disabled = False
file_handler = next(
(handler for handler in logger.handlers if handler.name == FILE_TASK_HANDLER), None
)
assert file_handler is not None
set_context(logger, ti)
assert file_handler.handler is not None
# We expect set_context generates a file locally.
log_filename = file_handler.handler.baseFilename
assert os.path.isfile(log_filename)
assert log_filename.endswith("0.log"), log_filename
ti.run(ignore_ti_state=True)
file_handler.flush()
file_handler.close()
assert hasattr(file_handler, "read")
# Return value of read must be a tuple of list and list.
# passing invalid `try_number` to read function
log, metadata = file_handler.read(ti, 0)
assert isinstance(metadata, dict)
assert log[0].event == "Error fetching the logs. Try number 0 is invalid."
# Remove the generated tmp log file.
os.remove(log_filename)
def test_file_task_handler(self, dag_maker, session):
def task_callable(ti):
ti.log.info("test")
with dag_maker("dag_for_testing_file_task_handler", schedule=None, session=session):
PythonOperator(
task_id="task_for_testing_file_log_handler",
python_callable=task_callable,
)
dagrun = dag_maker.create_dagrun()
(ti,) = dagrun.get_task_instances(session=session)
ti.try_number += 1
session.flush()
logger = ti.log
ti.log.disabled = False
file_handler = next(
(handler for handler in logger.handlers if handler.name == FILE_TASK_HANDLER), None
)
assert file_handler is not None
set_context(logger, ti)
assert file_handler.handler is not None
# We expect set_context generates a file locally.
log_filename = file_handler.handler.baseFilename
assert os.path.isfile(log_filename)
assert log_filename.endswith("1.log"), log_filename
ti.run(ignore_ti_state=True)
file_handler.flush()
file_handler.close()
assert hasattr(file_handler, "read")
log, metadata = file_handler.read(ti, 1)
assert isinstance(metadata, dict)
target_re = re.compile(r"\A\[[^\]]+\] {test_log_handlers.py:\d+} INFO - test\Z")
# We should expect our log line from the callable above to appear in
# the logs we read back
assert any(re.search(target_re, e) for e in events(log)), "Logs were " + str(log)
# Remove the generated tmp log file.
os.remove(log_filename)
@pytest.mark.parametrize(
"executor_name",
[
(executor_constants.KUBERNETES_EXECUTOR),
(None),
],
)
@conf_vars(
{
("core", "EXECUTOR"): executor_constants.KUBERNETES_EXECUTOR,
}
)
@patch(
"airflow.executors.executor_loader.ExecutorLoader.load_executor",
wraps=executor_loader.ExecutorLoader.load_executor,
)
@patch(
"airflow.executors.executor_loader.ExecutorLoader.get_default_executor",
wraps=executor_loader.ExecutorLoader.get_default_executor,
)
def test_file_task_handler_with_multiple_executors(
self,
mock_get_default_executor,
mock_load_executor,
executor_name,
create_task_instance,
clean_executor_loader,
):
executors_mapping = executor_loader.ExecutorLoader.executors
default_executor_name = executor_loader.ExecutorLoader.get_default_executor_name()
path_to_executor_class: str
if executor_name is None:
path_to_executor_class = executors_mapping.get(default_executor_name.alias)
else:
path_to_executor_class = executors_mapping.get(executor_name)
with patch(f"{path_to_executor_class}.get_task_log", return_value=([], [])) as mock_get_task_log:
mock_get_task_log.return_value = ([], [])
ti = create_task_instance(
dag_id="dag_for_testing_multiple_executors",
task_id="task_for_testing_multiple_executors",
run_type=DagRunType.SCHEDULED,
logical_date=DEFAULT_DATE,
)
if executor_name is not None:
ti.executor = executor_name
ti.try_number = 1
ti.state = TaskInstanceState.RUNNING
logger = ti.log
ti.log.disabled = False
file_handler = next(
(handler for handler in logger.handlers if handler.name == FILE_TASK_HANDLER), None
)
assert file_handler is not None
set_context(logger, ti)
# clear executor_instances cache
file_handler.executor_instances = {}
assert file_handler.handler is not None
# We expect set_context generates a file locally.
log_filename = file_handler.handler.baseFilename
assert os.path.isfile(log_filename)
assert log_filename.endswith("1.log"), log_filename
file_handler.flush()
file_handler.close()
assert hasattr(file_handler, "read")
file_handler.read(ti)
os.remove(log_filename)
mock_get_task_log.assert_called_once()
if executor_name is None:
mock_get_default_executor.assert_called_once()
# will be called in `ExecutorLoader.get_default_executor` method
mock_load_executor.assert_called_once_with(default_executor_name)
else:
mock_get_default_executor.assert_not_called()
mock_load_executor.assert_called_once_with(executor_name)
def test_file_task_handler_running(self, dag_maker):
def task_callable(ti):
ti.log.info("test")
with dag_maker("dag_for_testing_file_task_handler", schedule=None):
task = PythonOperator(
task_id="task_for_testing_file_log_handler",
python_callable=task_callable,
)
dagrun = dag_maker.create_dagrun()
ti = TaskInstance(task=task, run_id=dagrun.run_id)
ti.try_number = 2
ti.state = State.RUNNING
logger = ti.log
ti.log.disabled = False
file_handler = next(
(handler for handler in logger.handlers if handler.name == FILE_TASK_HANDLER), None
)
assert file_handler is not None
set_context(logger, ti)
assert file_handler.handler is not None
# We expect set_context generates a file locally.
log_filename = file_handler.handler.baseFilename
assert os.path.isfile(log_filename)
assert log_filename.endswith("2.log"), log_filename
logger.info("Test")
# Return value of read must be a tuple of list and list.
logs, metadata = file_handler.read(ti)
assert isinstance(logs, list)
# Logs for running tasks should show up too.
assert isinstance(metadata, dict)
# Remove the generated tmp log file.
os.remove(log_filename)
def test_file_task_handler_rotate_size_limit(self, dag_maker):
def reset_log_config(update_conf):
import logging.config
logging_config = DEFAULT_LOGGING_CONFIG
logging_config = deep_update(logging_config, update_conf)
logging.config.dictConfig(logging_config)
def task_callable(ti):
pass
max_bytes_size = 60000
update_conf = {"handlers": {"task": {"max_bytes": max_bytes_size, "backup_count": 1}}}
reset_log_config(update_conf)
with dag_maker("dag_for_testing_file_task_handler_rotate_size_limit"):
task = PythonOperator(
task_id="task_for_testing_file_log_handler_rotate_size_limit",
python_callable=task_callable,
)
dagrun = dag_maker.create_dagrun()
ti = TaskInstance(task=task, run_id=dagrun.run_id)
ti.try_number = 1
ti.state = State.RUNNING
logger = ti.log
ti.log.disabled = False
file_handler = next(
(handler for handler in logger.handlers if handler.name == FILE_TASK_HANDLER), None
)
assert file_handler is not None
set_context(logger, ti)
assert file_handler.handler is not None
# We expect set_context generates a file locally, this is the first log file
# in this test, it should generate 2 when it finishes.
log_filename = file_handler.handler.baseFilename
assert os.path.isfile(log_filename)
assert log_filename.endswith("1.log"), log_filename
# mock to generate 2000 lines of log, the total size is larger than max_bytes_size
for i in range(1, 2000):
logger.info("this is a Test. %s", i)
# this is the rotate log file
log_rotate_1_name = log_filename + ".1"
assert os.path.isfile(log_rotate_1_name)
current_file_size = os.path.getsize(log_filename)
rotate_file_1_size = os.path.getsize(log_rotate_1_name)
assert rotate_file_1_size > max_bytes_size * 0.9
assert rotate_file_1_size < max_bytes_size
assert current_file_size < max_bytes_size
# Return value of read must be a tuple of list and list.
logs, metadata = file_handler.read(ti)
# the log content should have the filename of both current log file and rotate log file.
find_current_log = False
find_rotate_log_1 = False
for log in logs:
if log_filename in str(log):
find_current_log = True
if log_rotate_1_name in str(log):
find_rotate_log_1 = True
assert find_current_log is True
assert find_rotate_log_1 is True
# Logs for running tasks should show up too.
assert isinstance(logs, list)
# Remove the two generated tmp log files.
os.remove(log_filename)
os.remove(log_rotate_1_name)
@patch("airflow.utils.log.file_task_handler.FileTaskHandler._read_from_local")
def test__read_when_local(self, mock_read_local, create_task_instance):
"""
Test if local log file exists, then values returned from _read_from_local should be incorporated
into returned log.
"""
path = Path(
"dag_id=dag_for_testing_local_log_read/run_id=scheduled__2016-01-01T00:00:00+00:00/task_id=task_for_testing_local_log_read/attempt=1.log"
)
mock_read_local.return_value = (["the messages"], ["the log"])
local_log_file_read = create_task_instance(
dag_id="dag_for_testing_local_log_read",
task_id="task_for_testing_local_log_read",
run_type=DagRunType.SCHEDULED,
logical_date=DEFAULT_DATE,
)
fth = FileTaskHandler("")
logs, metadata = fth._read(ti=local_log_file_read, try_number=1)
mock_read_local.assert_called_with(path)
as_text = events(logs)
assert logs[0].sources == ["the messages"]
assert as_text[-1] == "the log"
assert metadata == {"end_of_log": True, "log_pos": 1}
def test__read_from_local(self, tmp_path):
"""Tests the behavior of method _read_from_local"""
path1 = tmp_path / "hello1.log"
path2 = tmp_path / "hello1.log.suffix.log"
path1.write_text("file1 content")
path2.write_text("file2 content")
fth = FileTaskHandler("")
assert fth._read_from_local(path1) == (
[str(path1), str(path2)],
["file1 content", "file2 content"],
)
@pytest.mark.parametrize(
"remote_logs, local_logs, served_logs_checked",
[
(True, True, False),
(True, False, False),
(False, True, False),
(False, False, True),
],
)
def test__read_served_logs_checked_when_done_and_no_local_or_remote_logs(
self, create_task_instance, remote_logs, local_logs, served_logs_checked
):
"""
Generally speaking when a task is done we should not read from logs server,
because we assume for log persistence that users will either set up shared
drive or enable remote logging. But if they don't do that, and therefore
we don't find remote or local logs, we'll check worker for served logs as
a fallback.
"""
executor_name = "CeleryExecutor"
ti = create_task_instance(
dag_id="dag_for_testing_celery_executor_log_read",
task_id="task_for_testing_celery_executor_log_read",
run_type=DagRunType.SCHEDULED,
logical_date=DEFAULT_DATE,
)
ti.state = TaskInstanceState.SUCCESS # we're testing scenario when task is done
with conf_vars({("core", "executor"): executor_name}):
reload(executor_loader)
fth = FileTaskHandler("")
if remote_logs:
fth._read_remote_logs = mock.Mock()
fth._read_remote_logs.return_value = ["found remote logs"], ["remote\nlog\ncontent"]
if local_logs:
fth._read_from_local = mock.Mock()
fth._read_from_local.return_value = ["found local logs"], ["local\nlog\ncontent"]
fth._read_from_logs_server = mock.Mock()
fth._read_from_logs_server.return_value = ["this message"], ["this\nlog\ncontent"]
logs, metadata = fth._read(ti=ti, try_number=1)
if served_logs_checked:
fth._read_from_logs_server.assert_called_once()
assert events(logs) == [
"::group::Log message source details",
"::endgroup::",
"this",
"log",
"content",
]
assert metadata == {"end_of_log": True, "log_pos": 3}
else:
fth._read_from_logs_server.assert_not_called()
assert logs
assert metadata
def test_add_triggerer_suffix(self):
sample = "any/path/to/thing.txt"
assert FileTaskHandler.add_triggerer_suffix(sample) == sample + ".trigger"
assert FileTaskHandler.add_triggerer_suffix(sample, job_id=None) == sample + ".trigger"
assert FileTaskHandler.add_triggerer_suffix(sample, job_id=123) == sample + ".trigger.123.log"
assert FileTaskHandler.add_triggerer_suffix(sample, job_id="123") == sample + ".trigger.123.log"
@pytest.mark.parametrize("is_a_trigger", [True, False])
def test_set_context_trigger(self, create_dummy_dag, dag_maker, is_a_trigger, session, tmp_path):
create_dummy_dag(dag_id="test_fth", task_id="dummy")
(ti,) = dag_maker.create_dagrun(logical_date=pendulum.datetime(2023, 1, 1, tz="UTC")).task_instances
assert isinstance(ti, TaskInstance)
if is_a_trigger:
ti.is_trigger_log_context = True
job = Job()
t = Trigger("", {})
t.triggerer_job = job
session.add(t)
ti.triggerer = t
t.task_instance = ti
h = FileTaskHandler(base_log_folder=os.fspath(tmp_path))
h.set_context(ti)
expected = "dag_id=test_fth/run_id=test/task_id=dummy/attempt=0.log"
if is_a_trigger:
expected += f".trigger.{job.id}.log"
actual = h.handler.baseFilename
assert actual == os.fspath(tmp_path / expected)
@pytest.mark.parametrize("logical_date", ((None), (DEFAULT_DATE)))
class TestFilenameRendering:
def test_python_formatting(self, create_log_template, create_task_instance, logical_date):
create_log_template("{dag_id}/{task_id}/{logical_date}/{try_number}.log")
filename_rendering_ti = create_task_instance(
dag_id="dag_for_testing_filename_rendering",
task_id="task_for_testing_filename_rendering",
run_type=DagRunType.SCHEDULED,
run_after=DEFAULT_DATE,
logical_date=logical_date,
catchup=True,
)
expected_filename = (
f"dag_for_testing_filename_rendering/task_for_testing_filename_rendering/"
f"{DEFAULT_DATE.isoformat()}/42.log"
)
fth = FileTaskHandler("")
rendered_filename = fth._render_filename(filename_rendering_ti, 42)
assert expected_filename == rendered_filename
def test_python_formatting_catchup_false(self, create_log_template, create_task_instance, logical_date):
"""Test the filename rendering with catchup=False (the new default behavior)"""
create_log_template("{dag_id}/{task_id}/{logical_date}/{try_number}.log")
filename_rendering_ti = create_task_instance(
dag_id="dag_for_testing_filename_rendering",
task_id="task_for_testing_filename_rendering",
run_type=DagRunType.SCHEDULED,
run_after=DEFAULT_DATE,
logical_date=logical_date,
)
# With catchup=False:
# - If logical_date is None, it will use current date as the logical date
# - If logical_date is explicitly provided, it will use that date regardless of catchup setting
expected_date = logical_date if logical_date is not None else filename_rendering_ti.logical_date
expected_filename = (
f"dag_for_testing_filename_rendering/task_for_testing_filename_rendering/"
f"{expected_date.isoformat()}/42.log"
)
fth = FileTaskHandler("")
rendered_filename = fth._render_filename(filename_rendering_ti, 42)
assert expected_filename == rendered_filename
def test_jinja_rendering(self, create_log_template, create_task_instance, logical_date):
create_log_template("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log")
filename_rendering_ti = create_task_instance(
dag_id="dag_for_testing_filename_rendering",
task_id="task_for_testing_filename_rendering",
run_type=DagRunType.SCHEDULED,
run_after=DEFAULT_DATE,
logical_date=logical_date,
catchup=True,
)
expected_filename = (
f"dag_for_testing_filename_rendering/task_for_testing_filename_rendering/"
f"{DEFAULT_DATE.isoformat()}/42.log"
)
fth = FileTaskHandler("")
rendered_filename = fth._render_filename(filename_rendering_ti, 42)
assert expected_filename == rendered_filename
def test_jinja_rendering_catchup_false(self, create_log_template, create_task_instance, logical_date):
"""Test the Jinja template rendering with catchup=False (the new default behavior)"""
create_log_template("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log")
filename_rendering_ti = create_task_instance(
dag_id="dag_for_testing_filename_rendering",
task_id="task_for_testing_filename_rendering",
run_type=DagRunType.SCHEDULED,
run_after=DEFAULT_DATE,
logical_date=logical_date,
)
# With catchup=False:
# - If logical_date is None, it will use current date as the logical date
# - If logical_date is explicitly provided, it will use that date regardless of catchup setting
expected_date = logical_date if logical_date is not None else filename_rendering_ti.logical_date
expected_filename = (
f"dag_for_testing_filename_rendering/task_for_testing_filename_rendering/"
f"{expected_date.isoformat()}/42.log"
)
fth = FileTaskHandler("")
rendered_filename = fth._render_filename(filename_rendering_ti, 42)
assert expected_filename == rendered_filename
class TestLogUrl:
def test_log_retrieval_valid(self, create_task_instance):
log_url_ti = create_task_instance(
dag_id="dag_for_testing_filename_rendering",
task_id="task_for_testing_filename_rendering",
run_type=DagRunType.SCHEDULED,
logical_date=DEFAULT_DATE,
)
log_url_ti.hostname = "hostname"
actual = FileTaskHandler("")._get_log_retrieval_url(log_url_ti, "DYNAMIC_PATH")
assert actual == ("http://hostname:8793/log/DYNAMIC_PATH", "DYNAMIC_PATH")
def test_log_retrieval_valid_trigger(self, create_task_instance):
ti = create_task_instance(
dag_id="dag_for_testing_filename_rendering",
task_id="task_for_testing_filename_rendering",
run_type=DagRunType.SCHEDULED,
logical_date=DEFAULT_DATE,
)
ti.hostname = "hostname"
trigger = Trigger("", {})
job = Job(TriggererJobRunner.job_type)
job.id = 123
trigger.triggerer_job = job
ti.trigger = trigger
actual = FileTaskHandler("")._get_log_retrieval_url(ti, "DYNAMIC_PATH", log_type=LogType.TRIGGER)
hostname = get_hostname()
assert actual == (
f"http://{hostname}:8794/log/DYNAMIC_PATH.trigger.123.log",
"DYNAMIC_PATH.trigger.123.log",
)
log_sample = """[2022-11-16T00:05:54.278-0800] {taskinstance.py:1257} INFO -
--------------------------------------------------------------------------------
[2022-11-16T00:05:54.278-0800] {taskinstance.py:1258} INFO - Starting attempt 1 of 1
[2022-11-16T00:05:54.279-0800] {taskinstance.py:1259} INFO -
--------------------------------------------------------------------------------
[2022-11-16T00:05:54.295-0800] {taskinstance.py:1278} INFO - Executing <Task(TimeDeltaSensorAsync): wait> on 2022-11-16 08:05:52.324532+00:00
[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - Started process 52536 to run task
[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - Started process 52536 to run task
[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - Started process 52536 to run task
[2022-11-16T00:05:54.306-0800] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'simple_async_timedelta', 'wait', 'manual__2022-11-16T08:05:52.324532+00:00', '--job-id', '33648', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmp725r305n']
[2022-11-16T00:05:54.309-0800] {standard_task_runner.py:83} INFO - Job 33648: Subtask wait
[2022-11-16T00:05:54.457-0800] {task_command.py:376} INFO - Running <TaskInstance: simple_async_timedelta.wait manual__2022-11-16T08:05:52.324532+00:00 [running]> on host daniels-mbp-2.lan
[2022-11-16T00:05:54.592-0800] {taskinstance.py:1485} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=simple_async_timedelta
AIRFLOW_CTX_TASK_ID=wait
AIRFLOW_CTX_LOGICAL_DATE=2022-11-16T08:05:52.324532+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-11-16T08:05:52.324532+00:00
[2022-11-16T00:05:54.604-0800] {taskinstance.py:1360} INFO - Pausing task as DEFERRED. dag_id=simple_async_timedelta, task_id=wait, execution_date=20221116T080552, start_date=20221116T080554
"""
def test_parse_timestamps():
actual = []
for timestamp, _, _ in _parse_log_lines(log_sample.splitlines()):
actual.append(timestamp)
assert actual == [
pendulum.parse("2022-11-16T00:05:54.278000-08:00"),
pendulum.parse("2022-11-16T00:05:54.278000-08:00"),
pendulum.parse("2022-11-16T00:05:54.278000-08:00"),
pendulum.parse("2022-11-16T00:05:54.279000-08:00"),
pendulum.parse("2022-11-16T00:05:54.279000-08:00"),
pendulum.parse("2022-11-16T00:05:54.295000-08:00"),
pendulum.parse("2022-11-16T00:05:54.300000-08:00"),
pendulum.parse("2022-11-16T00:05:54.300000-08:00"), # duplicate
pendulum.parse("2022-11-16T00:05:54.300000-08:00"), # duplicate
pendulum.parse("2022-11-16T00:05:54.306000-08:00"),
pendulum.parse("2022-11-16T00:05:54.309000-08:00"),
pendulum.parse("2022-11-16T00:05:54.457000-08:00"),
pendulum.parse("2022-11-16T00:05:54.592000-08:00"),
pendulum.parse("2022-11-16T00:05:54.592000-08:00"),
pendulum.parse("2022-11-16T00:05:54.592000-08:00"),
pendulum.parse("2022-11-16T00:05:54.592000-08:00"),
pendulum.parse("2022-11-16T00:05:54.592000-08:00"),
pendulum.parse("2022-11-16T00:05:54.592000-08:00"),
pendulum.parse("2022-11-16T00:05:54.592000-08:00"),
pendulum.parse("2022-11-16T00:05:54.604000-08:00"),
]
def test_interleave_interleaves():
log_sample1 = "\n".join(
[
"[2022-11-16T00:05:54.278-0800] {taskinstance.py:1258} INFO - Starting attempt 1 of 1",
]
)
log_sample2 = "\n".join(
[
"[2022-11-16T00:05:54.295-0800] {taskinstance.py:1278} INFO - Executing <Task(TimeDeltaSensorAsync): wait> on 2022-11-16 08:05:52.324532+00:00",
"[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - Started process 52536 to run task",
"[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - Started process 52536 to run task",
"[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - Started process 52536 to run task",
"[2022-11-16T00:05:54.306-0800] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'simple_async_timedelta', 'wait', 'manual__2022-11-16T08:05:52.324532+00:00', '--job-id', '33648', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmp725r305n']",
"[2022-11-16T00:05:54.309-0800] {standard_task_runner.py:83} INFO - Job 33648: Subtask wait",
]
)
log_sample3 = "\n".join(
[
"[2022-11-16T00:05:54.457-0800] {task_command.py:376} INFO - Running <TaskInstance: simple_async_timedelta.wait manual__2022-11-16T08:05:52.324532+00:00 [running]> on host daniels-mbp-2.lan",
"[2022-11-16T00:05:54.592-0800] {taskinstance.py:1485} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER=airflow",
"AIRFLOW_CTX_DAG_ID=simple_async_timedelta",
"AIRFLOW_CTX_TASK_ID=wait",
"AIRFLOW_CTX_LOGICAL_DATE=2022-11-16T08:05:52.324532+00:00",
"AIRFLOW_CTX_TRY_NUMBER=1",
"AIRFLOW_CTX_DAG_RUN_ID=manual__2022-11-16T08:05:52.324532+00:00",
"[2022-11-16T00:05:54.604-0800] {taskinstance.py:1360} INFO - Pausing task as DEFERRED. dag_id=simple_async_timedelta, task_id=wait, execution_date=20221116T080552, start_date=20221116T080554",
]
)
# -08:00
tz = pendulum.tz.fixed_timezone(-28800)
DateTime = pendulum.DateTime
expected = [
{
"event": "[2022-11-16T00:05:54.278-0800] {taskinstance.py:1258} INFO - Starting attempt 1 of 1",
"timestamp": DateTime(2022, 11, 16, 0, 5, 54, 278000, tzinfo=tz),
},
{
"event": "[2022-11-16T00:05:54.295-0800] {taskinstance.py:1278} INFO - "
"Executing <Task(TimeDeltaSensorAsync): wait> on 2022-11-16 "
"08:05:52.324532+00:00",
"timestamp": DateTime(2022, 11, 16, 0, 5, 54, 295000, tzinfo=tz),
},
{
"event": "[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - "
"Started process 52536 to run task",
"timestamp": DateTime(2022, 11, 16, 0, 5, 54, 300000, tzinfo=tz),
},
{
"event": "[2022-11-16T00:05:54.306-0800] {standard_task_runner.py:82} INFO - "
"Running: ['airflow', 'tasks', 'run', 'simple_async_timedelta', "
"'wait', 'manual__2022-11-16T08:05:52.324532+00:00', '--job-id', "
"'33648', '--raw', '--subdir', "
"'/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', "
"'--cfg-path', "
"'/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmp725r305n']",
"timestamp": DateTime(2022, 11, 16, 0, 5, 54, 306000, tzinfo=tz),
},
{
"event": "[2022-11-16T00:05:54.309-0800] {standard_task_runner.py:83} INFO - "
"Job 33648: Subtask wait",
"timestamp": DateTime(2022, 11, 16, 0, 5, 54, 309000, tzinfo=tz),
},
{
"event": "[2022-11-16T00:05:54.457-0800] {task_command.py:376} INFO - "
"Running <TaskInstance: simple_async_timedelta.wait "
"manual__2022-11-16T08:05:52.324532+00:00 [running]> on host "
"daniels-mbp-2.lan",
"timestamp": DateTime(2022, 11, 16, 0, 5, 54, 457000, tzinfo=tz),
},
{
"event": "[2022-11-16T00:05:54.592-0800] {taskinstance.py:1485} INFO - "
"Exporting env vars: AIRFLOW_CTX_DAG_OWNER=airflow",
"timestamp": DateTime(2022, 11, 16, 0, 5, 54, 592000, tzinfo=tz),
},
{
"event": "AIRFLOW_CTX_DAG_ID=simple_async_timedelta",
"timestamp": DateTime(2022, 11, 16, 0, 5, 54, 592000, tzinfo=tz),
},
{
"event": "AIRFLOW_CTX_TASK_ID=wait",
"timestamp": DateTime(2022, 11, 16, 0, 5, 54, 592000, tzinfo=tz),
},
{
"event": "AIRFLOW_CTX_LOGICAL_DATE=2022-11-16T08:05:52.324532+00:00",
"timestamp": DateTime(2022, 11, 16, 0, 5, 54, 592000, tzinfo=tz),
},
{
"event": "AIRFLOW_CTX_TRY_NUMBER=1",
"timestamp": DateTime(2022, 11, 16, 0, 5, 54, 592000, tzinfo=tz),
},
{
"event": "AIRFLOW_CTX_DAG_RUN_ID=manual__2022-11-16T08:05:52.324532+00:00",
"timestamp": DateTime(2022, 11, 16, 0, 5, 54, 592000, tzinfo=tz),
},
{
"event": "[2022-11-16T00:05:54.604-0800] {taskinstance.py:1360} INFO - "
"Pausing task as DEFERRED. dag_id=simple_async_timedelta, "
"task_id=wait, execution_date=20221116T080552, "
"start_date=20221116T080554",
"timestamp": DateTime(2022, 11, 16, 0, 5, 54, 604000, tzinfo=tz),
},
]
# Use a type adapter to durn it in to dicts -- makes it easier to compare/test than a bunch of objects
results = TypeAdapter(list[StructuredLogMessage]).dump_python(
_interleave_logs(log_sample2, log_sample1, log_sample3)
)
# TypeAdapter gives us a generator out when it's generator is an input. Nice, but not useful for testing
results = list(results)
assert results == expected
def test_interleave_logs_correct_ordering():
"""
Notice there are two messages with timestamp `2023-01-17T12:47:11.883-0800`.
In this case, these should appear in correct order and be deduped in result.
"""
sample_with_dupe = """[2023-01-17T12:46:55.868-0800] {temporal.py:62} INFO - trigger starting
[2023-01-17T12:46:55.868-0800] {temporal.py:71} INFO - sleeping 1 second...
[2023-01-17T12:47:09.882-0800] {temporal.py:71} INFO - sleeping 1 second...
[2023-01-17T12:47:10.882-0800] {temporal.py:71} INFO - sleeping 1 second...
[2023-01-17T12:47:11.883-0800] {temporal.py:74} INFO - yielding event with payload DateTime(2023, 1, 17, 20, 47, 11, 254388, tzinfo=Timezone('UTC'))
[2023-01-17T12:47:11.883-0800] {triggerer_job.py:540} INFO - Trigger <airflow.triggers.temporal.DateTimeTrigger moment=2023-01-17T20:47:11.254388+00:00> (ID 1) fired: TriggerEvent<DateTime(2023, 1, 17, 20, 47, 11, 254388, tzinfo=Timezone('UTC'))>
"""
logs = events(_interleave_logs(sample_with_dupe, "", sample_with_dupe))
assert sample_with_dupe == "\n".join(logs)
def test_interleave_logs_correct_dedupe():
sample_without_dupe = """test,
test,
test,
test,
test,
test,
test,
test,
test,
test"""
logs = events(_interleave_logs(",\n ".join(["test"] * 10)))
assert sample_without_dupe == "\n".join(logs)
def test_permissions_for_new_directories(tmp_path):
# Set umask to 0o027: owner rwx, group rx-w, other -rwx
old_umask = os.umask(0o027)
try:
base_dir = tmp_path / "base"
base_dir.mkdir()
log_dir = base_dir / "subdir1" / "subdir2"
# force permissions for the new folder to be owner rwx, group -rxw, other -rwx
new_folder_permissions = 0o700
# default permissions are owner rwx, group rx-w, other -rwx (umask bit negative)
default_permissions = 0o750
FileTaskHandler._prepare_log_folder(log_dir, new_folder_permissions)
assert log_dir.exists()
assert log_dir.is_dir()
assert log_dir.stat().st_mode % 0o1000 == new_folder_permissions
assert log_dir.parent.stat().st_mode % 0o1000 == new_folder_permissions
assert base_dir.stat().st_mode % 0o1000 == default_permissions
finally:
os.umask(old_umask)
worker_url = "http://10.240.5.168:8793"
log_location = "dag_id=sample/run_id=manual__2024-05-23T07:18:59.298882+00:00/task_id=sourcing/attempt=1.log"
log_url = f"{worker_url}/log/{log_location}"
@mock.patch("requests.adapters.HTTPAdapter.send")
def test_fetch_logs_from_service_with_not_matched_no_proxy(mock_send, monkeypatch):
monkeypatch.setenv("http_proxy", "http://proxy.example.com")
monkeypatch.setenv("no_proxy", "localhost")
response = Response()
response.status_code = HTTPStatus.OK
mock_send.return_value = response
_fetch_logs_from_service(log_url, log_location)
mock_send.assert_called()
_, kwargs = mock_send.call_args
assert "proxies" in kwargs
proxies = kwargs["proxies"]
assert "http" in proxies.keys()
assert "no" in proxies.keys()
@mock.patch("requests.adapters.HTTPAdapter.send")
def test_fetch_logs_from_service_with_cidr_no_proxy(mock_send, monkeypatch):
monkeypatch.setenv("http_proxy", "http://proxy.example.com")
monkeypatch.setenv("no_proxy", "10.0.0.0/8")
response = Response()
response.status_code = HTTPStatus.OK
mock_send.return_value = response
_fetch_logs_from_service(log_url, log_location)
mock_send.assert_called()
_, kwargs = mock_send.call_args
assert "proxies" in kwargs
proxies = kwargs["proxies"]
assert "http" not in proxies.keys()
assert "no" not in proxies.keys()