blob: 67ce825fbed60e9d0decc486ab545fc261da1a96 [file] [log] [blame]
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Google Cloud BigQuery Operators
===============================
`BigQuery <https://cloud.google.com/bigquery/>`__ is Google's fully managed, petabyte
scale, low cost analytics data warehouse. It is a serverless Software as a Service
(SaaS) that doesn't need a database administrator. It allows users to focus on
analyzing data to find meaningful insights using familiar SQL.
Airflow provides operators to manage datasets and tables, run queries and validate
data.
.. contents::
:depth: 1
:local:
Prerequisite Tasks
^^^^^^^^^^^^^^^^^^
.. include::/operators/_partials/prerequisite_tasks.rst
Manage datasets
^^^^^^^^^^^^^^^
.. _howto/operator:BigQueryCreateEmptyDatasetOperator:
Create dataset
""""""""""""""
To create an empty dataset in a BigQuery database you can use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyDatasetOperator`.
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_create_dataset]
:end-before: [END howto_operator_bigquery_create_dataset]
.. _howto/operator:BigQueryGetDatasetOperator:
Get dataset details
"""""""""""""""""""
To get the details of an existing dataset you can use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryGetDatasetOperator`.
This operator returns a `Dataset Resource <https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource>`__.
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_get_dataset]
:end-before: [END howto_operator_bigquery_get_dataset]
.. _howto/operator:BigQueryGetDatasetTablesOperator:
List tables in dataset
""""""""""""""""""""""
To retrieve the list of tables in a given dataset use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryGetDatasetTablesOperator`.
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_get_dataset_tables]
:end-before: [END howto_operator_bigquery_get_dataset_tables]
.. _howto/operator:BigQueryPatchDatasetOperator:
Patch dataset
"""""""""""""
To patch a dataset in BigQuery you can use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryPatchDatasetOperator`.
Note, this operator only replaces fields that are provided in the submitted dataset
resource.
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_patch_dataset]
:end-before: [END howto_operator_bigquery_patch_dataset]
.. _howto/operator:BigQueryUpdateDatasetOperator:
Update dataset
""""""""""""""
To update a dataset in BigQuery you can use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryUpdateDatasetOperator`.
The update method replaces the entire dataset resource, whereas the patch
method only replaces fields that are provided in the submitted dataset resource.
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_update_dataset]
:end-before: [END howto_operator_bigquery_update_dataset]
.. _howto/operator:BigQueryDeleteDatasetOperator:
Delete dataset
""""""""""""""
To delete an existing dataset from a BigQuery database you can use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryDeleteDatasetOperator`.
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_delete_dataset]
:end-before: [END howto_operator_bigquery_delete_dataset]
Manage tables
^^^^^^^^^^^^^
.. _howto/operator:BigQueryCreateEmptyTableOperator:
Create native table
"""""""""""""""""""
To create a new, empty table in the given BigQuery dataset, optionally with
schema you can use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyTableOperator`.
The schema to be used for the BigQuery table may be specified in one of two
ways. You may either directly pass the schema fields in, or you may point the
operator to a Google Cloud Storage object name. The object in Google Cloud
Storage must be a JSON file with the schema fields in it.
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_create_table]
:end-before: [END howto_operator_bigquery_create_table]
You can use this operator to create a view on top of an existing table.
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_create_view]
:end-before: [END howto_operator_bigquery_create_view]
.. _howto/operator:BigQueryCreateExternalTableOperator:
Create external table
"""""""""""""""""""""
To create a new external table in a dataset with the data in Google Cloud Storage
you can use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryCreateExternalTableOperator`.
Similarly to
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyTableOperator`
you may either directly pass the schema fields in, or you may point the operator
to a Google Cloud Storage object name.
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_create_external_table]
:end-before: [END howto_operator_bigquery_create_external_table]
.. _howto/operator:BigQueryGetDataOperator:
Fetch data from table
"""""""""""""""""""""
To fetch data from a BigQuery table you can use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryGetDataOperator`.
Alternatively you can fetch data for selected columns if you pass fields to
``selected_fields``.
This operator returns data in a Python list where the number of elements in the
returned list will be equal to the number of rows fetched. Each element in the
list will again be a list where elements would represent the column values for
that row.
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py
:language: python
:dedent: 8
:start-after: [START howto_operator_bigquery_get_data]
:end-before: [END howto_operator_bigquery_get_data]
.. _howto/operator:BigQueryUpsertTableOperator:
Upsert table
""""""""""""
To upsert a table you can use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryUpsertTableOperator`.
This operator either updates the existing table or creates a new, empty table
in the given dataset.
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_upsert_table]
:end-before: [END howto_operator_bigquery_upsert_table]
.. _howto/operator:BigQueryDeleteTableOperator:
Delete table
""""""""""""
To delete an existing table you can use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryDeleteTableOperator`.
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_delete_table]
:end-before: [END howto_operator_bigquery_delete_table]
You can also use this operator to delete a view.
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_delete_view]
:end-before: [END howto_operator_bigquery_delete_view]
.. _howto/operator:BigQueryInsertJobOperator:
Execute BigQuery jobs
^^^^^^^^^^^^^^^^^^^^^
Let's say you would like to execute the following query.
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py
:language: python
:dedent: 0
:start-after: [START howto_operator_bigquery_query]
:end-before: [END howto_operator_bigquery_query]
To execute the SQL query in a specific BigQuery database you can use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator` with
proper query job configuration that can be Jinja templated.
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py
:language: python
:dedent: 8
:start-after: [START howto_operator_bigquery_insert_job]
:end-before: [END howto_operator_bigquery_insert_job]
For more information on types of BigQuery job please check
`documentation <https://cloud.google.com/bigquery/docs/reference/v2/jobs>`__.
If you want to include some files in your configuration you can use ``include`` clause of Jinja template
language as follow:
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py
:language: python
:dedent: 8
:start-after: [START howto_operator_bigquery_select_job]
:end-before: [END howto_operator_bigquery_select_job]
The included file can also use Jinja templates which can be useful in case of ``.sql`` files.
Additionally you can use ``job_id`` parameter of
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator` to improve
idempotency. If this parameter is not passed then uuid will be used as ``job_id``. If provided then
operator will try to submit a new job with this ``job_id```. If there's already a job with such ``job_id``
then it will reattach to the existing job.
Validate data
^^^^^^^^^^^^^
.. _howto/operator:BigQueryCheckOperator:
Check if query result has data
""""""""""""""""""""""""""""""
To perform checks against BigQuery you can use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryCheckOperator`.
This operator expects a sql query that will return a single row. Each value on
that first row is evaluated using python ``bool`` casting. If any of the values
return ``False`` the check is failed and errors out.
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py
:language: python
:dedent: 8
:start-after: [START howto_operator_bigquery_check]
:end-before: [END howto_operator_bigquery_check]
.. _howto/operator:BigQueryValueCheckOperator:
Compare query result to pass value
""""""""""""""""""""""""""""""""""
To perform a simple value check using sql code you can use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryValueCheckOperator`.
This operator expects a sql query that will return a single row. Each value on
that first row is evaluated against ``pass_value`` which can be either a string
or numeric value. If numeric, you can also specify ``tolerance``.
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py
:language: python
:dedent: 8
:start-after: [START howto_operator_bigquery_value_check]
:end-before: [END howto_operator_bigquery_value_check]
.. _howto/operator:BigQueryIntervalCheckOperator:
Compare metrics over time
"""""""""""""""""""""""""
To check that the values of metrics given as SQL expressions are within a certain
tolerance of the ones from ``days_back`` before you can use
:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryIntervalCheckOperator`.
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py
:language: python
:dedent: 8
:start-after: [START howto_operator_bigquery_interval_check]
:end-before: [END howto_operator_bigquery_interval_check]
Sensors
^^^^^^^
Check that a Table exists
"""""""""""""""""""""""""
To check that a table exists you can define a sensor operator. This allows delaying execution
of downstream operators until a table exist. If the table is sharded on dates you can for instance
use the ``{{ ds_nodash }}`` macro as the table name suffix.
:class:`~airflow.providers.google.cloud.sensors.bigquery.BigQueryTableExistenceSensor`.
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_sensors.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_bigquery_table]
:end-before: [END howto_sensor_bigquery_table]
Check that a Table Partition exists
"""""""""""""""""""""""""""""""""""
To check that a table exists and has a partition you can use.
:class:`~airflow.providers.google.cloud.sensors.bigquery.BigQueryTablePartitionExistenceSensor`.
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_sensors.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_bigquery_table_partition]
:end-before: [END howto_sensor_bigquery_table_partition]
For DAY partitioned tables, the partition_id parameter is a string on the "%Y%m%d" format
Reference
^^^^^^^^^
For further information, look at:
* `Client Library Documentation <https://googleapis.dev/python/bigquery/latest/index.html>`__
* `Product Documentation <https://cloud.google.com/bigquery/docs/>`__