blob: 6fd0c07db540072adafb0511c90ef7dd478d7898 [file] [log] [blame]
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Define an operator extra link
=============================
For each operator, you can define its own extra links that can
redirect users to external systems. The extra link buttons
will be available on the task page:
.. image:: ../img/operator_extra_link.png
The following code shows how to add extra links to an operator:
.. note::
In order for extra links to be rendered you must be using the
``RBAC UI``, *NOT* the ``Flask UI``.
.. code-block:: python
from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
from airflow.utils.decorators import apply_defaults
class GoogleLink(BaseOperatorLink):
name = 'Google'
def get_link(self, operator, dttm):
return "https://www.google.com"
class MyFirstOperator(BaseOperator):
operator_extra_links = (
GoogleLink(),
)
@apply_defaults
def __init__(self, *args, **kwargs):
super(MyFirstOperator, self).__init__(*args, **kwargs)
def execute(self, context):
self.log.info("Hello World!")
You can also add a global operator extra link that will be available to
all the operators through an airflow plugin. Learn more about it in the
:ref:`plugin example <plugin-example>`.
Add or override Links to Existing Operators
-------------------------------------------
You can also add (or override) an extra link to an existing operators
through an Airflow plugin.
For example, the following Airflow plugin will add an Operator Link on all
tasks using :class:`~airflow.operators.gcs_to_s3.GoogleCloudStorageToS3Operator` operator.
**Adding Operator Links to Existing Operators**
``plugins/extra_link.py``:
.. code-block:: python
from airflow.plugins_manager import AirflowPlugin
from airflow.models.baseoperator import BaseOperatorLink
from airflow.operators.gcs_to_s3 import GoogleCloudStorageToS3Operator
class S3LogLink(BaseOperatorLink):
name = 'S3'
# Add list of all the operators to which you want to add this OperatorLinks
# Example: operators = [GoogleCloudStorageToS3Operator, GoogleCloudStorageToBigQueryOperator]
operators = [GoogleCloudStorageToS3Operator]
def get_link(self, operator, dttm):
return 'https://s3.amazonaws.com/airflow-logs/{dag_id}/{task_id}/{execution_date}'.format(
dag_id=operator.dag_id,
task_id=operator.task_id,
execution_date=dttm,
)
# Defining the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [S3LogLink(), ]
**Overriding Operator Links of Existing Operators**:
It is also possible to replace a built in link on an operator via a Plugin. For example
:class:`~airflow.gcp.operators.bigquery.BigQueryOperator` includes a link to the GCP
console, but if we wanted to change that link we could:
.. code-block:: python
from airflow.plugins_manager import AirflowPlugin
from airflow.models.baseoperator import BaseOperatorLink
from airflow.gcp.operators.bigquery import BigQueryOperator
# Change from https to http just to display the override
BIGQUERY_JOB_DETAILS_LINK_FMT = 'http://console.cloud.google.com/bigquery?j={job_id}'
class BigQueryConsoleLink(BaseOperatorLink):
"""
Helper class for constructing BigQuery link.
"""
name = 'BigQuery Console'
operators = [BigQueryOperator]
def get_link(self, operator, dttm):
ti = TaskInstance(task=operator, execution_date=dttm)
job_id = ti.xcom_pull(task_ids=operator.task_id, key='job_id')
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id else ''
# Defining the plugin class
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [BigQueryConsoleLink(), ]