| :mod:`airflow.contrib.operators.bigquery_operator` |
| ================================================== |
| |
| .. py:module:: airflow.contrib.operators.bigquery_operator |
| |
| .. autoapi-nested-parse:: |
| |
| This module contains Google BigQuery operators. |
| |
| |
| |
| Module Contents |
| --------------- |
| |
| .. data:: BIGQUERY_JOB_DETAILS_LINK_FMT |
| :annotation: = https://console.cloud.google.com/bigquery?j={job_id} |
| |
| |
| |
| .. py:class:: BigQueryConsoleLink |
| |
| Bases: :class:`airflow.models.baseoperator.BaseOperatorLink` |
| |
| Helper class for constructing BigQuery link. |
| |
| .. attribute:: name |
| :annotation: = BigQuery Console |
| |
| |
| |
| |
| .. method:: get_link(self, operator, dttm) |
| |
| |
| |
| |
| .. py:class:: BigQueryConsoleIndexableLink(index) |
| |
| Bases: :class:`airflow.models.baseoperator.BaseOperatorLink` |
| |
| Helper class for constructing BigQuery link. |
| |
| .. attribute:: name |
| |
| |
| |
| |
| |
| .. method:: get_link(self, operator, dttm) |
| |
| |
| |
| |
| .. py:class:: BigQueryOperator(bql=None, sql=None, destination_dataset_table=None, write_disposition='WRITE_EMPTY', allow_large_results=False, flatten_results=None, bigquery_conn_id='bigquery_default', delegate_to=None, udf_config=None, use_legacy_sql=True, maximum_billing_tier=None, maximum_bytes_billed=None, create_disposition='CREATE_IF_NEEDED', schema_update_options=(), query_params=None, labels=None, priority='INTERACTIVE', time_partitioning=None, api_resource_configs=None, cluster_fields=None, location=None, encryption_configuration=None, *args, **kwargs) |
| |
| Bases: :class:`airflow.models.baseoperator.BaseOperator` |
| |
| Executes BigQuery SQL queries in a specific BigQuery database |
| |
| :param bql: (Deprecated. Use `sql` parameter instead) the sql code to be |
| executed (templated) |
| :type bql: Can receive a str representing a sql statement, |
| a list of str (sql statements), or reference to a template file. |
| Template reference are recognized by str ending in '.sql'. |
| :param sql: the sql code to be executed (templated) |
| :type sql: Can receive a str representing a sql statement, |
| a list of str (sql statements), or reference to a template file. |
| Template reference are recognized by str ending in '.sql'. |
| :param destination_dataset_table: A dotted |
| ``(<project>.|<project>:)<dataset>.<table>`` that, if set, will store the results |
| of the query. (templated) |
| :type destination_dataset_table: str |
| :param write_disposition: Specifies the action that occurs if the destination table |
| already exists. (default: 'WRITE_EMPTY') |
| :type write_disposition: str |
| :param create_disposition: Specifies whether the job is allowed to create new tables. |
| (default: 'CREATE_IF_NEEDED') |
| :type create_disposition: str |
| :param allow_large_results: Whether to allow large results. |
| :type allow_large_results: bool |
| :param flatten_results: If true and query uses legacy SQL dialect, flattens |
| all nested and repeated fields in the query results. ``allow_large_results`` |
| must be ``true`` if this is set to ``false``. For standard SQL queries, this |
| flag is ignored and results are never flattened. |
| :type flatten_results: bool |
| :param bigquery_conn_id: reference to a specific BigQuery hook. |
| :type bigquery_conn_id: str |
| :param delegate_to: The account to impersonate, if any. |
| For this to work, the service account making the request must have domain-wide |
| delegation enabled. |
| :type delegate_to: str |
| :param udf_config: The User Defined Function configuration for the query. |
| See https://cloud.google.com/bigquery/user-defined-functions for details. |
| :type udf_config: list |
| :param use_legacy_sql: Whether to use legacy SQL (true) or standard SQL (false). |
| :type use_legacy_sql: bool |
| :param maximum_billing_tier: Positive integer that serves as a multiplier |
| of the basic price. |
| Defaults to None, in which case it uses the value set in the project. |
| :type maximum_billing_tier: int |
| :param maximum_bytes_billed: Limits the bytes billed for this job. |
| Queries that will have bytes billed beyond this limit will fail |
| (without incurring a charge). If unspecified, this will be |
| set to your project default. |
| :type maximum_bytes_billed: float |
| :param api_resource_configs: a dictionary that contain params |
| 'configuration' applied for Google BigQuery Jobs API: |
| https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs |
| for example, {'query': {'useQueryCache': False}}. You could use it |
| if you need to provide some params that are not supported by BigQueryOperator |
| like args. |
| :type api_resource_configs: dict |
| :param schema_update_options: Allows the schema of the destination |
| table to be updated as a side effect of the load job. |
| :type schema_update_options: Optional[Union[list, tuple, set]] |
| :param query_params: a list of dictionary containing query parameter types and |
| values, passed to BigQuery. The structure of dictionary should look like |
| 'queryParameters' in Google BigQuery Jobs API: |
| https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs. |
| For example, [{ 'name': 'corpus', 'parameterType': { 'type': 'STRING' }, |
| 'parameterValue': { 'value': 'romeoandjuliet' } }]. |
| :type query_params: list |
| :param labels: a dictionary containing labels for the job/query, |
| passed to BigQuery |
| :type labels: dict |
| :param priority: Specifies a priority for the query. |
| Possible values include INTERACTIVE and BATCH. |
| The default value is INTERACTIVE. |
| :type priority: str |
| :param time_partitioning: configure optional time partitioning fields i.e. |
| partition by field, type and expiration as per API specifications. |
| :type time_partitioning: dict |
| :param cluster_fields: Request that the result of this query be stored sorted |
| by one or more columns. This is only available in conjunction with |
| time_partitioning. The order of columns given determines the sort order. |
| :type cluster_fields: list[str] |
| :param location: The geographic location of the job. Required except for |
| US and EU. See details at |
| https://cloud.google.com/bigquery/docs/locations#specifying_your_location |
| :type location: str |
| :param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys). |
| **Example**: :: |
| |
| encryption_configuration = { |
| "kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key" |
| } |
| :type encryption_configuration: dict |
| |
| .. attribute:: template_fields |
| :annotation: = ['bql', 'sql', 'destination_dataset_table', 'labels'] |
| |
| |
| |
| .. attribute:: template_ext |
| :annotation: = ['.sql'] |
| |
| |
| |
| .. attribute:: ui_color |
| :annotation: = #e4f0e8 |
| |
| |
| |
| .. attribute:: __serialized_fields |
| :annotation: :Optional[FrozenSet[str]] |
| |
| |
| |
| .. attribute:: operator_extra_links |
| |
| |
| Return operator extra links |
| |
| |
| |
| .. method:: execute(self, context) |
| |
| |
| |
| |
| .. method:: on_kill(self) |
| |
| |
| |
| |
| .. classmethod:: get_serialized_fields(cls) |
| |
| Serialized BigQueryOperator contain exactly these fields. |
| |
| |
| |
| |
| .. py:class:: BigQueryCreateEmptyTableOperator(dataset_id, table_id, project_id=None, schema_fields=None, gcs_schema_object=None, time_partitioning=None, bigquery_conn_id='bigquery_default', google_cloud_storage_conn_id='google_cloud_default', delegate_to=None, labels=None, encryption_configuration=None, *args, **kwargs) |
| |
| Bases: :class:`airflow.models.baseoperator.BaseOperator` |
| |
| Creates a new, empty table in the specified BigQuery dataset, |
| optionally with schema. |
| |
| The schema to be used for the BigQuery table may be specified in one of |
| two ways. You may either directly pass the schema fields in, or you may |
| point the operator to a Google cloud storage object name. The object in |
| Google cloud storage must be a JSON file with the schema fields in it. |
| You can also create a table without schema. |
| |
| :param project_id: The project to create the table into. (templated) |
| :type project_id: str |
| :param dataset_id: The dataset to create the table into. (templated) |
| :type dataset_id: str |
| :param table_id: The Name of the table to be created. (templated) |
| :type table_id: str |
| :param schema_fields: If set, the schema field list as defined here: |
| https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema |
| |
| **Example**: :: |
| |
| schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, |
| {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}] |
| |
| :type schema_fields: list |
| :param gcs_schema_object: Full path to the JSON file containing |
| schema (templated). For |
| example: ``gs://test-bucket/dir1/dir2/employee_schema.json`` |
| :type gcs_schema_object: str |
| :param time_partitioning: configure optional time partitioning fields i.e. |
| partition by field, type and expiration as per API specifications. |
| |
| .. seealso:: |
| https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#timePartitioning |
| :type time_partitioning: dict |
| :param bigquery_conn_id: Reference to a specific BigQuery hook. |
| :type bigquery_conn_id: str |
| :param google_cloud_storage_conn_id: Reference to a specific Google |
| cloud storage hook. |
| :type google_cloud_storage_conn_id: str |
| :param delegate_to: The account to impersonate, if any. For this to |
| work, the service account making the request must have domain-wide |
| delegation enabled. |
| :type delegate_to: str |
| :param labels: a dictionary containing labels for the table, passed to BigQuery |
| |
| **Example (with schema JSON in GCS)**: :: |
| |
| CreateTable = BigQueryCreateEmptyTableOperator( |
| task_id='BigQueryCreateEmptyTableOperator_task', |
| dataset_id='ODS', |
| table_id='Employees', |
| project_id='internal-gcp-project', |
| gcs_schema_object='gs://schema-bucket/employee_schema.json', |
| bigquery_conn_id='airflow-service-account', |
| google_cloud_storage_conn_id='airflow-service-account' |
| ) |
| |
| **Corresponding Schema file** (``employee_schema.json``): :: |
| |
| [ |
| { |
| "mode": "NULLABLE", |
| "name": "emp_name", |
| "type": "STRING" |
| }, |
| { |
| "mode": "REQUIRED", |
| "name": "salary", |
| "type": "INTEGER" |
| } |
| ] |
| |
| **Example (with schema in the DAG)**: :: |
| |
| CreateTable = BigQueryCreateEmptyTableOperator( |
| task_id='BigQueryCreateEmptyTableOperator_task', |
| dataset_id='ODS', |
| table_id='Employees', |
| project_id='internal-gcp-project', |
| schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, |
| {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}], |
| bigquery_conn_id='airflow-service-account', |
| google_cloud_storage_conn_id='airflow-service-account' |
| ) |
| :type labels: dict |
| :param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys). |
| **Example**: :: |
| |
| encryption_configuration = { |
| "kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key" |
| } |
| :type encryption_configuration: dict |
| |
| .. attribute:: template_fields |
| :annotation: = ['dataset_id', 'table_id', 'project_id', 'gcs_schema_object', 'labels'] |
| |
| |
| |
| .. attribute:: ui_color |
| :annotation: = #f0eee4 |
| |
| |
| |
| |
| .. method:: execute(self, context) |
| |
| |
| |
| |
| .. py:class:: BigQueryCreateExternalTableOperator(bucket, source_objects, destination_project_dataset_table, schema_fields=None, schema_object=None, source_format='CSV', compression='NONE', skip_leading_rows=0, field_delimiter=',', max_bad_records=0, quote_character=None, allow_quoted_newlines=False, allow_jagged_rows=False, bigquery_conn_id='bigquery_default', google_cloud_storage_conn_id='google_cloud_default', delegate_to=None, src_fmt_configs=None, labels=None, encryption_configuration=None, *args, **kwargs) |
| |
| Bases: :class:`airflow.models.baseoperator.BaseOperator` |
| |
| Creates a new external table in the dataset with the data in Google Cloud |
| Storage. |
| |
| The schema to be used for the BigQuery table may be specified in one of |
| two ways. You may either directly pass the schema fields in, or you may |
| point the operator to a Google cloud storage object name. The object in |
| Google cloud storage must be a JSON file with the schema fields in it. |
| |
| :param bucket: The bucket to point the external table to. (templated) |
| :type bucket: str |
| :param source_objects: List of Google cloud storage URIs to point |
| table to. (templated) |
| If source_format is 'DATASTORE_BACKUP', the list must only contain a single URI. |
| :type source_objects: list |
| :param destination_project_dataset_table: The dotted ``(<project>.)<dataset>.<table>`` |
| BigQuery table to load data into (templated). If ``<project>`` is not included, |
| project will be the project defined in the connection json. |
| :type destination_project_dataset_table: str |
| :param schema_fields: If set, the schema field list as defined here: |
| https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema |
| |
| **Example**: :: |
| |
| schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, |
| {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}] |
| |
| Should not be set when source_format is 'DATASTORE_BACKUP'. |
| :type schema_fields: list |
| :param schema_object: If set, a GCS object path pointing to a .json file that |
| contains the schema for the table. (templated) |
| :type schema_object: str |
| :param source_format: File format of the data. |
| :type source_format: str |
| :param compression: [Optional] The compression type of the data source. |
| Possible values include GZIP and NONE. |
| The default value is NONE. |
| This setting is ignored for Google Cloud Bigtable, |
| Google Cloud Datastore backups and Avro formats. |
| :type compression: str |
| :param skip_leading_rows: Number of rows to skip when loading from a CSV. |
| :type skip_leading_rows: int |
| :param field_delimiter: The delimiter to use for the CSV. |
| :type field_delimiter: str |
| :param max_bad_records: The maximum number of bad records that BigQuery can |
| ignore when running the job. |
| :type max_bad_records: int |
| :param quote_character: The value that is used to quote data sections in a CSV file. |
| :type quote_character: str |
| :param allow_quoted_newlines: Whether to allow quoted newlines (true) or not (false). |
| :type allow_quoted_newlines: bool |
| :param allow_jagged_rows: Accept rows that are missing trailing optional columns. |
| The missing values are treated as nulls. If false, records with missing trailing |
| columns are treated as bad records, and if there are too many bad records, an |
| invalid error is returned in the job result. Only applicable to CSV, ignored |
| for other formats. |
| :type allow_jagged_rows: bool |
| :param bigquery_conn_id: Reference to a specific BigQuery hook. |
| :type bigquery_conn_id: str |
| :param google_cloud_storage_conn_id: Reference to a specific Google |
| cloud storage hook. |
| :type google_cloud_storage_conn_id: str |
| :param delegate_to: The account to impersonate, if any. For this to |
| work, the service account making the request must have domain-wide |
| delegation enabled. |
| :type delegate_to: str |
| :param src_fmt_configs: configure optional fields specific to the source format |
| :type src_fmt_configs: dict |
| :param labels: a dictionary containing labels for the table, passed to BigQuery |
| :type labels: dict |
| :param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys). |
| **Example**: :: |
| |
| encryption_configuration = { |
| "kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key" |
| } |
| :type encryption_configuration: dict |
| |
| .. attribute:: template_fields |
| :annotation: = ['bucket', 'source_objects', 'schema_object', 'destination_project_dataset_table', 'labels'] |
| |
| |
| |
| .. attribute:: ui_color |
| :annotation: = #f0eee4 |
| |
| |
| |
| |
| .. method:: execute(self, context) |
| |
| |
| |
| |
| .. py:class:: BigQueryDeleteDatasetOperator(dataset_id, delete_contents=False, project_id=None, bigquery_conn_id='bigquery_default', delegate_to=None, *args, **kwargs) |
| |
| Bases: :class:`airflow.models.baseoperator.BaseOperator` |
| |
| This operator deletes an existing dataset from your Project in Big query. |
| https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/delete |
| |
| :param project_id: The project id of the dataset. |
| :type project_id: str |
| :param dataset_id: The dataset to be deleted. |
| :type dataset_id: str |
| :param delete_contents: (Optional) Whether to force the deletion even if the dataset is not empty. |
| Will delete all tables (if any) in the dataset if set to True. |
| Will raise HttpError 400: "{dataset_id} is still in use" if set to False and dataset is not empty. |
| The default value is False. |
| :type delete_contents: bool |
| :param bigquery_conn_id: The connection ID used to connect to Google Cloud Platform. |
| :type bigquery_conn_id: str |
| |
| **Example**: :: |
| |
| delete_temp_data = BigQueryDeleteDatasetOperator( |
| dataset_id='temp-dataset', |
| project_id='temp-project', |
| delete_contents=True, # Force the deletion of the dataset as well as its tables (if any). |
| bigquery_conn_id='_my_gcp_conn_', |
| task_id='Deletetemp', |
| dag=dag) |
| |
| .. attribute:: template_fields |
| :annotation: = ['dataset_id', 'project_id'] |
| |
| |
| |
| .. attribute:: ui_color |
| :annotation: = #f00004 |
| |
| |
| |
| |
| .. method:: execute(self, context) |
| |
| |
| |
| |
| .. py:class:: BigQueryCreateEmptyDatasetOperator(dataset_id, project_id=None, dataset_reference=None, bigquery_conn_id='bigquery_default', delegate_to=None, *args, **kwargs) |
| |
| Bases: :class:`airflow.models.baseoperator.BaseOperator` |
| |
| This operator is used to create new dataset for your Project in Big query. |
| https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource |
| |
| :param project_id: The name of the project where we want to create the dataset. |
| Don't need to provide, if projectId in dataset_reference. |
| :type project_id: str |
| :param dataset_id: The id of dataset. Don't need to provide, |
| if datasetId in dataset_reference. |
| :type dataset_id: str |
| :param dataset_reference: Dataset reference that could be provided with request body. |
| More info: |
| https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource |
| :type dataset_reference: dict |
| |
| **Example**: :: |
| |
| create_new_dataset = BigQueryCreateEmptyDatasetOperator( |
| dataset_id = 'new-dataset', |
| project_id = 'my-project', |
| dataset_reference = {"friendlyName": "New Dataset"} |
| bigquery_conn_id='_my_gcp_conn_', |
| task_id='newDatasetCreator', |
| dag=dag) |
| |
| .. attribute:: template_fields |
| :annotation: = ['dataset_id', 'project_id'] |
| |
| |
| |
| .. attribute:: ui_color |
| :annotation: = #f0eee4 |
| |
| |
| |
| |
| .. method:: execute(self, context) |
| |
| |
| |
| |
| .. py:class:: BigQueryGetDatasetOperator(dataset_id, project_id=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs) |
| |
| Bases: :class:`airflow.models.baseoperator.BaseOperator` |
| |
| This operator is used to return the dataset specified by dataset_id. |
| |
| :param dataset_id: The id of dataset. Don't need to provide, |
| if datasetId in dataset_reference. |
| :type dataset_id: str |
| :param project_id: The name of the project where we want to create the dataset. |
| Don't need to provide, if projectId in dataset_reference. |
| :type project_id: str |
| :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform. |
| :type gcp_conn_id: str |
| :rtype: dataset |
| https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource |
| |
| .. attribute:: template_fields |
| :annotation: = ['dataset_id', 'project_id'] |
| |
| |
| |
| .. attribute:: ui_color |
| :annotation: = #f0eee4 |
| |
| |
| |
| |
| .. method:: execute(self, context) |
| |
| |
| |
| |
| .. py:class:: BigQueryPatchDatasetOperator(dataset_id, dataset_resource, project_id=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs) |
| |
| Bases: :class:`airflow.models.baseoperator.BaseOperator` |
| |
| This operator is used to patch dataset for your Project in BigQuery. |
| It only replaces fields that are provided in the submitted dataset resource. |
| |
| :param dataset_id: The id of dataset. Don't need to provide, |
| if datasetId in dataset_reference. |
| :type dataset_id: str |
| :param dataset_resource: Dataset resource that will be provided with request body. |
| https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource |
| :type dataset_resource: dict |
| :param project_id: The name of the project where we want to create the dataset. |
| Don't need to provide, if projectId in dataset_reference. |
| :type project_id: str |
| :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform. |
| :type gcp_conn_id: str |
| :rtype: dataset |
| https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource |
| |
| .. attribute:: template_fields |
| :annotation: = ['dataset_id', 'project_id'] |
| |
| |
| |
| .. attribute:: ui_color |
| :annotation: = #f0eee4 |
| |
| |
| |
| |
| .. method:: execute(self, context) |
| |
| |
| |
| |
| .. py:class:: BigQueryUpdateDatasetOperator(dataset_id, dataset_resource, project_id=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs) |
| |
| Bases: :class:`airflow.models.baseoperator.BaseOperator` |
| |
| This operator is used to update dataset for your Project in BigQuery. |
| The update method replaces the entire dataset resource, whereas the patch |
| method only replaces fields that are provided in the submitted dataset resource. |
| |
| :param dataset_id: The id of dataset. Don't need to provide, |
| if datasetId in dataset_reference. |
| :type dataset_id: str |
| :param dataset_resource: Dataset resource that will be provided with request body. |
| https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource |
| :type dataset_resource: dict |
| :param project_id: The name of the project where we want to create the dataset. |
| Don't need to provide, if projectId in dataset_reference. |
| :type project_id: str |
| :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform. |
| :type gcp_conn_id: str |
| :rtype: dataset |
| https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource |
| |
| .. attribute:: template_fields |
| :annotation: = ['dataset_id', 'project_id'] |
| |
| |
| |
| .. attribute:: ui_color |
| :annotation: = #f0eee4 |
| |
| |
| |
| |
| .. method:: execute(self, context) |
| |
| |
| |
| |