Updating Airflow

This file documents any backwards-incompatible changes in Airflow and assists people when migrating to a new version.

Airflow 1.9

SSH Hook updates, along with new SSH Operator & SFTP Operator

SSH Hook now uses Paramiko library to create ssh client connection, instead of sub-process based ssh command execution previously (<1.9.0), so this is backward incompatible.

  • update SSHHook constructor
  • use SSHOperator class in place of SSHExecuteOperator which is removed now. Refer test_ssh_operator.py for usage info.
  • SFTPOperator is added to perform secure file transfer from serverA to serverB. Refer test_sftp_operator.py.py for usage info.
  • No updates are required if you are using ftpHook, it will continue work as is.

S3Hook switched to use Boto3

The airflow.hooks.S3_hook.S3Hook has been switched to use boto3 instead of the older boto (a.k.a. boto2). This result in a few backwards incompatible changes to the following classes: S3Hook:

  • the constructors no longer accepts s3_conn_id. It is now called aws_conn_id.
  • the default conneciton is now “aws_default” instead of “s3_default”
  • the return type of objects returned by get_bucket is now boto3.s3.Bucket
  • the return type of get_key, and get_wildcard_key is now an boto3.S3.Object.

If you are using any of these in your DAGs and specify a connection ID you will need to update the parameter name for the connection to “aws_conn_id”: S3ToHiveTransfer, S3PrefixSensor, S3KeySensor, RedshiftToS3Transfer.

Logging update

The logging structure of Airflow has been rewritten to make configuration easier and the logging system more transparent.

A quick recap about logging

A logger is the entry point into the logging system. Each logger is a named bucket to which messages can be written for processing. A logger is configured to have a log level. This log level describes the severity of the messages that the logger will handle. Python defines the following log levels: DEBUG, INFO, WARNING, ERROR or CRITICAL.

Each message that is written to the logger is a Log Record. Each log record also has a log level indicating the severity of that specific message. A log record can also contain useful metadata that describes the event that is being logged. This can include details such as a stack trace or an error code.

When a message is given to the logger, the log level of the message is compared to the log level of the logger. If the log level of the message meets or exceeds the log level of the logger itself, the message will undergo further processing. If it doesn’t, the message will be ignored.

Once a logger has determined that a message needs to be processed, it is passed to a Handler. This configuration is now more flexible and can be easily be maintained in a single file.

Changes in Airflow Logging

Airflow's logging mechanism has been refactored to uses Python’s builtin logging module to perform logging of the application. By extending classes with the existing LoggingMixin, all the logging will go through a central logger. Also the BaseHook and BaseOperator already extends this class, so it is easily available to do logging.

The main benefit is easier configuration of the logging by setting a single centralized python file. Disclaimer; there is still some inline configuration, but this will be removed eventually. The new logging class is defined by setting the dotted classpath in your ~/airflow/airflow.cfg file:

# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
logging_config_class = my.path.default_local_settings.LOGGING_CONFIG

The logging configuration file needs to be on the PYTHONPATH, for example $AIRFLOW_HOME/config. This directory is loaded by default. Of course you are free to add any directory to the PYTHONPATH, this might be handy when you have the config in another directory or you mount a volume in case of Docker.

You can take the config from airflow/config_templates/airflow_local_settings.py as a starting point. Copy the contents to ${AIRFLOW_HOME}/config/airflow_local_settings.py, and alter the config as you like.

LOGGING_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'airflow.task': {
            'format': LOG_FORMAT,
        },
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'airflow.task',
            'stream': 'ext://sys.stdout'
        },
        'file.task': {
            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'filename_template': FILENAME_TEMPLATE,
        },
        # When using s3 or gcs, provide a customized LOGGING_CONFIG
        # in airflow_local_settings within your PYTHONPATH, see UPDATING.md
        # for details
        's3.task': {
            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            's3_log_folder': S3_LOG_FOLDER,
            'filename_template': FILENAME_TEMPLATE,
        },
        'gcs.task': {
            'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'gcs_log_folder': GCS_LOG_FOLDER,
            'filename_template': FILENAME_TEMPLATE,
        },
    },
    'loggers': {
        'airflow.task': {
            'handlers': ['file.task'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.task_runner': {
            'handlers': ['file.task'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
        'airflow': {
            'handlers': ['console'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
    }
}

If you want to customize the logging (for example, use logging rotate), you can do this by defining one or more of the logging handles that Python has to offer. For more details about the Python logging, please refer to the official logging documentation.

Furthermore, this change also simplifies logging within the DAG itself:

root@ae1bc863e815:/airflow# python
Python 3.6.2 (default, Sep 13 2017, 14:26:54)
[GCC 4.9.2] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from airflow.settings import *
>>>
>>> from datetime import datetime
>>> from airflow import DAG
>>> from airflow.operators.dummy_operator import DummyOperator
>>>
>>> dag = DAG('simple_dag', start_date=datetime(2017, 9, 1))
>>>
>>> task = DummyOperator(task_id='task_1', dag=dag)
>>>
>>> task.log.error('I want to say something..')
[2017-09-25 20:17:04,927] {<stdin>:1} ERROR - I want to say something..

Template path of the file_task_handler

The file_task_handler logger is more flexible. You can change the default format, {dag_id}/{task_id}/{execution_date}/{try_number}.log by supplying Jinja templating in the FILENAME_TEMPLATE configuration variable. See the file_task_handler for more information.

I'm using S3Log or GCSLogs, what do I do!?

If you are logging to Google cloud storage, please see the Google cloud platform documentation for logging instructions.

If you are using S3, the instructions should be largely the same as the Google cloud platform instructions above. You will need a custom logging config. The REMOTE_BASE_LOG_FOLDER configuration key in your airflow config has been removed, therefore you will need to take the following steps:

  • Copy the logging configuration from airflow/config_templates/airflow_logging_settings.py and copy it.
  • Place it in a directory inside the Python import path PYTHONPATH. If you are using Python 2.7, ensuring that any __init__.py files exist so that it is importable.
  • Update the config by setting the path of REMOTE_BASE_LOG_FOLDER explicitly in the config. The REMOTE_BASE_LOG_FOLDER key is not used anymore.
  • Set the logging_config_class to the filename and dict. For example, if you place custom_logging_config.py on the base of your pythonpath, you will need to set logging_config_class = custom_logging_config.LOGGING_CONFIG in your config as Airflow 1.8.

New Features

Dask Executor

A new DaskExecutor allows Airflow tasks to be run in Dask Distributed clusters.

Deprecated Features

These features are marked for deprecation. They may still work (and raise a DeprecationWarning), but are no longer supported and will be removed entirely in Airflow 2.0

  • If you‘re using the google_cloud_conn_id or dataproc_cluster argument names explicitly in contrib.operators.Dataproc{*}Operator(s), be sure to rename them to gcp_conn_id or cluster_name, respectively. We’ve renamed these arguments for consistency. (AIRFLOW-1323)

  • post_execute() hooks now take two arguments, context and result (AIRFLOW-886)

    Previously, post_execute() only took one argument, context.

  • contrib.hooks.gcp_dataflow_hook.DataFlowHook starts to use --runner=DataflowRunner instead of DataflowPipelineRunner, which is removed from the package google-cloud-dataflow-0.6.0.

  • The pickle type for XCom messages has been replaced by json to prevent RCE attacks. Note that JSON serialization is stricter than pickling, so if you want to e.g. pass raw bytes through XCom you must encode them using an encoding like base64. By default pickling is still enabled until Airflow 2.0. To disable it Set enable_xcom_pickling = False in your Airflow config.

Airflow 1.8.1

The Airflow package name was changed from airflow to apache-airflow during this release. You must uninstall your previously installed version of Airflow before installing 1.8.1.

Airflow 1.8

Database

The database schema needs to be upgraded. Make sure to shutdown Airflow and make a backup of your database. To upgrade the schema issue airflow upgradedb.

Upgrade systemd unit files

Systemd unit files have been updated. If you use systemd please make sure to update these.

Please note that the webserver does not detach properly, this will be fixed in a future version.

Tasks not starting although dependencies are met due to stricter pool checking

Airflow 1.7.1 has issues with being able to over subscribe to a pool, ie. more slots could be used than were available. This is fixed in Airflow 1.8.0, but due to past issue jobs may fail to start although their dependencies are met after an upgrade. To workaround either temporarily increase the amount of slots above the the amount of queued tasks or use a new pool.

Less forgiving scheduler on dynamic start_date

Using a dynamic start_date (e.g. start_date = datetime.now()) is not considered a best practice. The 1.8.0 scheduler is less forgiving in this area. If you encounter DAGs not being scheduled you can try using a fixed start_date and renaming your dag. The last step is required to make sure you start with a clean slate, otherwise the old schedule can interfere.

New and updated scheduler options

Please read through these options, defaults have changed since 1.7.1.

child_process_log_directory

In order the increase the robustness of the scheduler, DAGS our now processed in their own process. Therefore each DAG has its own log file for the scheduler. These are placed in child_process_log_directory which defaults to <AIRFLOW_HOME>/scheduler/latest. You will need to make sure these log files are removed.

DAG logs or processor logs ignore and command line settings for log file locations.

run_duration

Previously the command line option num_runs was used to let the scheduler terminate after a certain amount of loops. This is now time bound and defaults to -1, which means run continuously. See also num_runs.

num_runs

Previously num_runs was used to let the scheduler terminate after a certain amount of loops. Now num_runs specifies the number of times to try to schedule each DAG file within run_duration time. Defaults to -1, which means try indefinitely. This is only available on the command line.

min_file_process_interval

After how much time should an updated DAG be picked up from the filesystem.

dag_dir_list_interval

How often the scheduler should relist the contents of the DAG directory. If you experience that while developing your dags are not being picked up, have a look at this number and decrease it when necessary.

catchup_by_default

By default the scheduler will fill any missing interval DAG Runs between the last execution date and the current date. This setting changes that behavior to only execute the latest interval. This can also be specified per DAG as catchup = False / True. Command line backfills will still work.

Faulty Dags do not show an error in the Web UI

Due to changes in the way Airflow processes DAGs the Web UI does not show an error when processing a faulty DAG. To find processing errors go the child_process_log_directory which defaults to <AIRFLOW_HOME>/scheduler/latest.

New DAGs are paused by default

Previously, new DAGs would be scheduled immediately. To retain the old behavior, add this to airflow.cfg:

[core]
dags_are_paused_at_creation = False

Airflow Context variable are passed to Hive config if conf is specified

If you specify a hive conf to the run_cli command of the HiveHook, Airflow add some convenience variables to the config. In case your run a sceure Hadoop setup it might be required to whitelist these variables by adding the following to your configuration:

<property>
     <name>hive.security.authorization.sqlstd.confwhitelist.append</name>
     <value>airflow\.ctx\..*</value>
</property>

Google Cloud Operator and Hook alignment

All Google Cloud Operators and Hooks are aligned and use the same client library. Now you have a single connection type for all kinds of Google Cloud Operators.

If you experience problems connecting with your operator make sure you set the connection type “Google Cloud Platform”.

Also the old P12 key file type is not supported anymore and only the new JSON key files are supported as a service account.

Deprecated Features

These features are marked for deprecation. They may still work (and raise a DeprecationWarning), but are no longer supported and will be removed entirely in Airflow 2.0

  • Hooks and operators must be imported from their respective submodules

    airflow.operators.PigOperator is no longer supported; from airflow.operators.pig_operator import PigOperator is. (AIRFLOW-31, AIRFLOW-200)

  • Operators no longer accept arbitrary arguments

    Previously, Operator.__init__() accepted any arguments (either positional *args or keyword **kwargs) without complaint. Now, invalid arguments will be rejected. (https://github.com/apache/incubator-airflow/pull/1285)

Known Issues

There is a report that the default of “-1” for num_runs creates an issue where errors are reported while parsing tasks. It was not confirmed, but a workaround was found by changing the default back to None.

To do this edit cli.py, find the following:

        'num_runs': Arg(
            ("-n", "--num_runs"),
            default=-1, type=int,
            help="Set the number of runs to execute before exiting"),

and change default=-1 to default=None. Please report on the mailing list if you have this issue.

Airflow 1.7.1.2

Changes to Configuration

Email configuration change

To continue using the default smtp email backend, change the email_backend line in your config file from:

[email]
email_backend = airflow.utils.send_email_smtp

to:

[email]
email_backend = airflow.utils.email.send_email_smtp

S3 configuration change

To continue using S3 logging, update your config file so:

s3_log_folder = s3://my-airflow-log-bucket/logs

becomes:

remote_base_log_folder = s3://my-airflow-log-bucket/logs
remote_log_conn_id = <your desired s3 connection>