blob: 76115a2f3a279b8cc53ee481f3a14c8eea716f7f [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import logging.config
import os
import re
import unittest
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from airflow.models import DAG, DagRun, TaskInstance
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import set_context
from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.timezone import datetime
from airflow.utils.types import DagRunType
DEFAULT_DATE = datetime(2016, 1, 1)
TASK_LOGGER = 'airflow.task'
FILE_TASK_HANDLER = 'task'
class TestFileTaskLogHandler(unittest.TestCase):
def clean_up(self):
with create_session() as session:
session.query(DagRun).delete()
session.query(TaskInstance).delete()
def setUp(self):
super().setUp()
logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
logging.root.disabled = False
self.clean_up()
# We use file task handler by default.
def tearDown(self):
self.clean_up()
super().tearDown()
def test_default_task_logging_setup(self):
# file task handler is used by default.
logger = logging.getLogger(TASK_LOGGER)
handlers = logger.handlers
assert len(handlers) == 1
handler = handlers[0]
assert handler.name == FILE_TASK_HANDLER
def test_file_task_handler(self):
def task_callable(ti, **kwargs):
ti.log.info("test")
dag = DAG('dag_for_testing_file_task_handler', start_date=DEFAULT_DATE)
dag.create_dagrun(run_type=DagRunType.MANUAL, state=State.RUNNING, execution_date=DEFAULT_DATE)
task = PythonOperator(
task_id='task_for_testing_file_log_handler',
dag=dag,
python_callable=task_callable,
)
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
logger = ti.log
ti.log.disabled = False
file_handler = next(
(handler for handler in logger.handlers if handler.name == FILE_TASK_HANDLER), None
)
assert file_handler is not None
set_context(logger, ti)
assert file_handler.handler is not None
# We expect set_context generates a file locally.
log_filename = file_handler.handler.baseFilename
assert os.path.isfile(log_filename)
assert log_filename.endswith("1.log"), log_filename
ti.run(ignore_ti_state=True)
file_handler.flush()
file_handler.close()
assert hasattr(file_handler, 'read')
# Return value of read must be a tuple of list and list.
logs, metadatas = file_handler.read(ti)
assert isinstance(logs, list)
assert isinstance(metadatas, list)
assert len(logs) == 1
assert len(logs) == len(metadatas)
assert isinstance(metadatas[0], dict)
target_re = r'\n\[[^\]]+\] {test_log_handlers.py:\d+} INFO - test\n'
# We should expect our log line from the callable above to appear in
# the logs we read back
assert re.search(target_re, logs[0][0][-1]), "Logs were " + str(logs)
# Remove the generated tmp log file.
os.remove(log_filename)
def test_file_task_handler_running(self):
def task_callable(ti, **kwargs):
ti.log.info("test")
dag = DAG('dag_for_testing_file_task_handler', start_date=DEFAULT_DATE)
task = PythonOperator(
task_id='task_for_testing_file_log_handler',
dag=dag,
python_callable=task_callable,
)
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
ti.try_number = 2
ti.state = State.RUNNING
logger = ti.log
ti.log.disabled = False
file_handler = next(
(handler for handler in logger.handlers if handler.name == FILE_TASK_HANDLER), None
)
assert file_handler is not None
set_context(logger, ti)
assert file_handler.handler is not None
# We expect set_context generates a file locally.
log_filename = file_handler.handler.baseFilename
assert os.path.isfile(log_filename)
assert log_filename.endswith("2.log"), log_filename
logger.info("Test")
# Return value of read must be a tuple of list and list.
logs, metadatas = file_handler.read(ti)
assert isinstance(logs, list)
# Logs for running tasks should show up too.
assert isinstance(logs, list)
assert isinstance(metadatas, list)
assert len(logs) == 2
assert len(logs) == len(metadatas)
assert isinstance(metadatas[0], dict)
# Remove the generated tmp log file.
os.remove(log_filename)
class TestFilenameRendering(unittest.TestCase):
def setUp(self):
dag = DAG('dag_for_testing_filename_rendering', start_date=DEFAULT_DATE)
task = DummyOperator(task_id='task_for_testing_filename_rendering', dag=dag)
self.ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
def test_python_formatting(self):
expected_filename = (
'dag_for_testing_filename_rendering/task_for_testing_filename_rendering/%s/42.log'
% DEFAULT_DATE.isoformat()
)
fth = FileTaskHandler('', '{dag_id}/{task_id}/{execution_date}/{try_number}.log')
rendered_filename = fth._render_filename(self.ti, 42)
assert expected_filename == rendered_filename
def test_jinja_rendering(self):
expected_filename = (
'dag_for_testing_filename_rendering/task_for_testing_filename_rendering/%s/42.log'
% DEFAULT_DATE.isoformat()
)
fth = FileTaskHandler('', '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log')
rendered_filename = fth._render_filename(self.ti, 42)
assert expected_filename == rendered_filename