| :py:mod:`airflow.providers.google.cloud.operators.dataflow` |
| =========================================================== |
| |
| .. py:module:: airflow.providers.google.cloud.operators.dataflow |
| |
| .. autoapi-nested-parse:: |
| |
| This module contains Google Dataflow operators. |
| |
| |
| |
| Module Contents |
| --------------- |
| |
| Classes |
| ~~~~~~~ |
| |
| .. autoapisummary:: |
| |
| airflow.providers.google.cloud.operators.dataflow.CheckJobRunning |
| airflow.providers.google.cloud.operators.dataflow.DataflowConfiguration |
| airflow.providers.google.cloud.operators.dataflow.DataflowCreateJavaJobOperator |
| airflow.providers.google.cloud.operators.dataflow.DataflowTemplatedJobStartOperator |
| airflow.providers.google.cloud.operators.dataflow.DataflowStartFlexTemplateOperator |
| airflow.providers.google.cloud.operators.dataflow.DataflowStartSqlJobOperator |
| airflow.providers.google.cloud.operators.dataflow.DataflowCreatePythonJobOperator |
| |
| |
| |
| |
| .. py:class:: CheckJobRunning |
| |
| Bases: :py:obj:`enum.Enum` |
| |
| Helper enum for choosing what to do if job is already running |
| IgnoreJob - do not check if running |
| FinishIfRunning - finish current dag run with no action |
| WaitForRun - wait for job to finish and then continue with new job |
| |
| .. py:attribute:: IgnoreJob |
| :annotation: = 1 |
| |
| |
| |
| .. py:attribute:: FinishIfRunning |
| :annotation: = 2 |
| |
| |
| |
| .. py:attribute:: WaitForRun |
| :annotation: = 3 |
| |
| |
| |
| |
| .. py:class:: DataflowConfiguration(*, job_name = '{{task.task_id}}', append_job_name = True, project_id = None, location = DEFAULT_DATAFLOW_LOCATION, gcp_conn_id = 'google_cloud_default', delegate_to = None, poll_sleep = 10, impersonation_chain = None, drain_pipeline = False, cancel_timeout = 5 * 60, wait_until_finished = None, multiple_jobs = None, check_if_running = CheckJobRunning.WaitForRun, service_account = None) |
| |
| Dataflow configuration that can be passed to |
| :py:class:`~airflow.providers.apache.beam.operators.beam.BeamRunJavaPipelineOperator` and |
| :py:class:`~airflow.providers.apache.beam.operators.beam.BeamRunPythonPipelineOperator`. |
| |
| :param job_name: The 'jobName' to use when executing the Dataflow job |
| (templated). This ends up being set in the pipeline options, so any entry |
| with key ``'jobName'`` or ``'job_name'``in ``options`` will be overwritten. |
| :param append_job_name: True if unique suffix has to be appended to job name. |
| :param project_id: Optional, the Google Cloud project ID in which to start a job. |
| If set to None or missing, the default project_id from the Google Cloud connection is used. |
| :param location: Job location. |
| :param gcp_conn_id: The connection ID to use connecting to Google Cloud. |
| :param delegate_to: The account to impersonate using domain-wide delegation of authority, |
| if any. For this to work, the service account making the request must have |
| domain-wide delegation enabled. |
| :param poll_sleep: The time in seconds to sleep between polling Google |
| Cloud Platform for the dataflow job status while the job is in the |
| JOB_STATE_RUNNING state. |
| :param impersonation_chain: Optional service account to impersonate using short-term |
| credentials, or chained list of accounts required to get the access_token |
| of the last account in the list, which will be impersonated in the request. |
| If set as a string, the account must grant the originating account |
| the Service Account Token Creator IAM role. |
| If set as a sequence, the identities from the list must grant |
| Service Account Token Creator IAM role to the directly preceding identity, with first |
| account from the list granting this role to the originating account (templated). |
| :param drain_pipeline: Optional, set to True if want to stop streaming job by draining it |
| instead of canceling during killing task instance. See: |
| https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline |
| :param cancel_timeout: How long (in seconds) operator should wait for the pipeline to be |
| successfully cancelled when task is being killed. (optional) default to 300s |
| :param wait_until_finished: (Optional) |
| If True, wait for the end of pipeline execution before exiting. |
| If False, only submits job. |
| If None, default behavior. |
| |
| The default behavior depends on the type of pipeline: |
| |
| * for the streaming pipeline, wait for jobs to start, |
| * for the batch pipeline, wait for the jobs to complete. |
| |
| .. warning:: |
| |
| You cannot call ``PipelineResult.wait_until_finish`` method in your pipeline code for the operator |
| to work properly. i. e. you must use asynchronous execution. Otherwise, your pipeline will |
| always wait until finished. For more information, look at: |
| `Asynchronous execution |
| <https://cloud.google.com/dataflow/docs/guides/specifying-exec-params#python_10>`__ |
| |
| The process of starting the Dataflow job in Airflow consists of two steps: |
| |
| * running a subprocess and reading the stderr/stderr log for the job id. |
| * loop waiting for the end of the job ID from the previous step. |
| This loop checks the status of the job. |
| |
| Step two is started just after step one has finished, so if you have wait_until_finished in your |
| pipeline code, step two will not start until the process stops. When this process stops, |
| steps two will run, but it will only execute one iteration as the job will be in a terminal state. |
| |
| If you in your pipeline do not call the wait_for_pipeline method but pass wait_until_finish=True |
| to the operator, the second loop will wait for the job's terminal state. |
| |
| If you in your pipeline do not call the wait_for_pipeline method, and pass wait_until_finish=False |
| to the operator, the second loop will check once is job not in terminal state and exit the loop. |
| :param multiple_jobs: If pipeline creates multiple jobs then monitor all jobs. Supported only by |
| :py:class:`~airflow.providers.apache.beam.operators.beam.BeamRunJavaPipelineOperator` |
| :param check_if_running: Before running job, validate that a previous run is not in process. |
| IgnoreJob = do not check if running. |
| FinishIfRunning = if job is running finish with nothing. |
| WaitForRun = wait until job finished and the run job. |
| Supported only by: |
| :py:class:`~airflow.providers.apache.beam.operators.beam.BeamRunJavaPipelineOperator` |
| :param service_account: Run the job as a specific service account, instead of the default GCE robot. |
| |
| .. py:attribute:: template_fields |
| :annotation: :Sequence[str] = ['job_name', 'location'] |
| |
| |
| |
| |
| .. py:class:: DataflowCreateJavaJobOperator(*, jar, job_name = '{{task.task_id}}', dataflow_default_options = None, options = None, project_id = None, location = DEFAULT_DATAFLOW_LOCATION, gcp_conn_id = 'google_cloud_default', delegate_to = None, poll_sleep = 10, job_class = None, check_if_running = CheckJobRunning.WaitForRun, multiple_jobs = False, cancel_timeout = 10 * 60, wait_until_finished = None, **kwargs) |
| |
| Bases: :py:obj:`airflow.models.BaseOperator` |
| |
| Start a Java Cloud Dataflow batch job. The parameters of the operation |
| will be passed to the job. |
| |
| This class is deprecated. |
| Please use `providers.apache.beam.operators.beam.BeamRunJavaPipelineOperator`. |
| |
| **Example**: :: |
| |
| default_args = { |
| "owner": "airflow", |
| "depends_on_past": False, |
| "start_date": (2016, 8, 1), |
| "email": ["alex@vanboxel.be"], |
| "email_on_failure": False, |
| "email_on_retry": False, |
| "retries": 1, |
| "retry_delay": timedelta(minutes=30), |
| "dataflow_default_options": { |
| "project": "my-gcp-project", |
| "zone": "us-central1-f", |
| "stagingLocation": "gs://bucket/tmp/dataflow/staging/", |
| }, |
| } |
| |
| dag = DAG("test-dag", default_args=default_args) |
| |
| task = DataflowCreateJavaJobOperator( |
| gcp_conn_id="gcp_default", |
| task_id="normalize-cal", |
| jar="{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar", |
| options={ |
| "autoscalingAlgorithm": "BASIC", |
| "maxNumWorkers": "50", |
| "start": "{{ds}}", |
| "partitionType": "DAY", |
| }, |
| dag=dag, |
| ) |
| |
| |
| .. seealso:: |
| For more detail on job submission have a look at the reference: |
| https://cloud.google.com/dataflow/pipelines/specifying-exec-params |
| |
| .. seealso:: |
| For more information on how to use this operator, take a look at the guide: |
| :ref:`howto/operator:DataflowCreateJavaJobOperator` |
| |
| :param jar: The reference to a self executing Dataflow jar (templated). |
| :param job_name: The 'jobName' to use when executing the Dataflow job |
| (templated). This ends up being set in the pipeline options, so any entry |
| with key ``'jobName'`` in ``options`` will be overwritten. |
| :param dataflow_default_options: Map of default job options. |
| :param options: Map of job specific options.The key must be a dictionary. |
| The value can contain different types: |
| |
| * If the value is None, the single option - ``--key`` (without value) will be added. |
| * If the value is False, this option will be skipped |
| * If the value is True, the single option - ``--key`` (without value) will be added. |
| * If the value is list, the many options will be added for each key. |
| If the value is ``['A', 'B']`` and the key is ``key`` then the ``--key=A --key=B`` options |
| will be left |
| * Other value types will be replaced with the Python textual representation. |
| |
| When defining labels (``labels`` option), you can also provide a dictionary. |
| :param project_id: Optional, the Google Cloud project ID in which to start a job. |
| If set to None or missing, the default project_id from the Google Cloud connection is used. |
| :param location: Job location. |
| :param gcp_conn_id: The connection ID to use connecting to Google Cloud. |
| :param delegate_to: The account to impersonate using domain-wide delegation of authority, |
| if any. For this to work, the service account making the request must have |
| domain-wide delegation enabled. |
| :param poll_sleep: The time in seconds to sleep between polling Google |
| Cloud Platform for the dataflow job status while the job is in the |
| JOB_STATE_RUNNING state. |
| :param job_class: The name of the dataflow job class to be executed, it |
| is often not the main class configured in the dataflow jar file. |
| |
| :param multiple_jobs: If pipeline creates multiple jobs then monitor all jobs |
| :param check_if_running: before running job, validate that a previous run is not in process |
| if job is running finish with nothing, WaitForRun= wait until job finished and the run job) |
| ``jar``, ``options``, and ``job_name`` are templated so you can use variables in them. |
| :param cancel_timeout: How long (in seconds) operator should wait for the pipeline to be |
| successfully cancelled when task is being killed. |
| :param wait_until_finished: (Optional) |
| If True, wait for the end of pipeline execution before exiting. |
| If False, only submits job. |
| If None, default behavior. |
| |
| The default behavior depends on the type of pipeline: |
| |
| * for the streaming pipeline, wait for jobs to start, |
| * for the batch pipeline, wait for the jobs to complete. |
| |
| .. warning:: |
| |
| You cannot call ``PipelineResult.wait_until_finish`` method in your pipeline code for the operator |
| to work properly. i. e. you must use asynchronous execution. Otherwise, your pipeline will |
| always wait until finished. For more information, look at: |
| `Asynchronous execution |
| <https://cloud.google.com/dataflow/docs/guides/specifying-exec-params#python_10>`__ |
| |
| The process of starting the Dataflow job in Airflow consists of two steps: |
| |
| * running a subprocess and reading the stderr/stderr log for the job id. |
| * loop waiting for the end of the job ID from the previous step. |
| This loop checks the status of the job. |
| |
| Step two is started just after step one has finished, so if you have wait_until_finished in your |
| pipeline code, step two will not start until the process stops. When this process stops, |
| steps two will run, but it will only execute one iteration as the job will be in a terminal state. |
| |
| If you in your pipeline do not call the wait_for_pipeline method but pass wait_until_finish=True |
| to the operator, the second loop will wait for the job's terminal state. |
| |
| If you in your pipeline do not call the wait_for_pipeline method, and pass wait_until_finish=False |
| to the operator, the second loop will check once is job not in terminal state and exit the loop. |
| |
| Note that both |
| ``dataflow_default_options`` and ``options`` will be merged to specify pipeline |
| execution parameter, and ``dataflow_default_options`` is expected to save |
| high-level options, for instances, project and zone information, which |
| apply to all dataflow operators in the DAG. |
| |
| It's a good practice to define dataflow_* parameters in the default_args of the dag |
| like the project, zone and staging location. |
| |
| .. code-block:: python |
| |
| default_args = { |
| "dataflow_default_options": { |
| "zone": "europe-west1-d", |
| "stagingLocation": "gs://my-staging-bucket/staging/", |
| } |
| } |
| |
| You need to pass the path to your dataflow as a file reference with the ``jar`` |
| parameter, the jar needs to be a self executing jar (see documentation here: |
| https://beam.apache.org/documentation/runners/dataflow/#self-executing-jar). |
| Use ``options`` to pass on options to your job. |
| |
| .. code-block:: python |
| |
| t1 = DataflowCreateJavaJobOperator( |
| task_id="dataflow_example", |
| jar="{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar", |
| options={ |
| "autoscalingAlgorithm": "BASIC", |
| "maxNumWorkers": "50", |
| "start": "{{ds}}", |
| "partitionType": "DAY", |
| "labels": {"foo": "bar"}, |
| }, |
| gcp_conn_id="airflow-conn-id", |
| dag=my_dag, |
| ) |
| |
| |
| .. py:attribute:: template_fields |
| :annotation: :Sequence[str] = ['options', 'jar', 'job_name'] |
| |
| |
| |
| .. py:attribute:: ui_color |
| :annotation: = #0273d4 |
| |
| |
| |
| .. py:method:: execute(self, context) |
| |
| Execute the Apache Beam Pipeline. |
| |
| |
| .. py:method:: on_kill(self) |
| |
| Override this method to cleanup subprocesses when a task instance |
| gets killed. Any use of the threading, subprocess or multiprocessing |
| module within an operator needs to be cleaned up or it will leave |
| ghost processes behind. |
| |
| |
| |
| .. py:class:: DataflowTemplatedJobStartOperator(*, template, job_name = '{{task.task_id}}', options = None, dataflow_default_options = None, parameters = None, project_id = None, location = DEFAULT_DATAFLOW_LOCATION, gcp_conn_id = 'google_cloud_default', delegate_to = None, poll_sleep = 10, impersonation_chain = None, environment = None, cancel_timeout = 10 * 60, wait_until_finished = None, **kwargs) |
| |
| Bases: :py:obj:`airflow.models.BaseOperator` |
| |
| Start a Templated Cloud Dataflow job. The parameters of the operation |
| will be passed to the job. |
| |
| .. seealso:: |
| For more information on how to use this operator, take a look at the guide: |
| :ref:`howto/operator:DataflowTemplatedJobStartOperator` |
| |
| :param template: The reference to the Dataflow template. |
| :param job_name: The 'jobName' to use when executing the Dataflow template |
| (templated). |
| :param options: Map of job runtime environment options. |
| It will update environment argument if passed. |
| |
| .. seealso:: |
| For more information on possible configurations, look at the API documentation |
| `https://cloud.google.com/dataflow/pipelines/specifying-exec-params |
| <https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment>`__ |
| |
| :param dataflow_default_options: Map of default job environment options. |
| :param parameters: Map of job specific parameters for the template. |
| :param project_id: Optional, the Google Cloud project ID in which to start a job. |
| If set to None or missing, the default project_id from the Google Cloud connection is used. |
| :param location: Job location. |
| :param gcp_conn_id: The connection ID to use connecting to Google Cloud. |
| :param delegate_to: The account to impersonate using domain-wide delegation of authority, |
| if any. For this to work, the service account making the request must have |
| domain-wide delegation enabled. |
| :param poll_sleep: The time in seconds to sleep between polling Google |
| Cloud Platform for the dataflow job status while the job is in the |
| JOB_STATE_RUNNING state. |
| :param impersonation_chain: Optional service account to impersonate using short-term |
| credentials, or chained list of accounts required to get the access_token |
| of the last account in the list, which will be impersonated in the request. |
| If set as a string, the account must grant the originating account |
| the Service Account Token Creator IAM role. |
| If set as a sequence, the identities from the list must grant |
| Service Account Token Creator IAM role to the directly preceding identity, with first |
| account from the list granting this role to the originating account (templated). |
| :param environment: Optional, Map of job runtime environment options. |
| |
| .. seealso:: |
| For more information on possible configurations, look at the API documentation |
| `https://cloud.google.com/dataflow/pipelines/specifying-exec-params |
| <https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment>`__ |
| :param cancel_timeout: How long (in seconds) operator should wait for the pipeline to be |
| successfully cancelled when task is being killed. |
| :param wait_until_finished: (Optional) |
| If True, wait for the end of pipeline execution before exiting. |
| If False, only submits job. |
| If None, default behavior. |
| |
| The default behavior depends on the type of pipeline: |
| |
| * for the streaming pipeline, wait for jobs to start, |
| * for the batch pipeline, wait for the jobs to complete. |
| |
| .. warning:: |
| |
| You cannot call ``PipelineResult.wait_until_finish`` method in your pipeline code for the operator |
| to work properly. i. e. you must use asynchronous execution. Otherwise, your pipeline will |
| always wait until finished. For more information, look at: |
| `Asynchronous execution |
| <https://cloud.google.com/dataflow/docs/guides/specifying-exec-params#python_10>`__ |
| |
| The process of starting the Dataflow job in Airflow consists of two steps: |
| |
| * running a subprocess and reading the stderr/stderr log for the job id. |
| * loop waiting for the end of the job ID from the previous step. |
| This loop checks the status of the job. |
| |
| Step two is started just after step one has finished, so if you have wait_until_finished in your |
| pipeline code, step two will not start until the process stops. When this process stops, |
| steps two will run, but it will only execute one iteration as the job will be in a terminal state. |
| |
| If you in your pipeline do not call the wait_for_pipeline method but pass wait_until_finish=True |
| to the operator, the second loop will wait for the job's terminal state. |
| |
| If you in your pipeline do not call the wait_for_pipeline method, and pass wait_until_finish=False |
| to the operator, the second loop will check once is job not in terminal state and exit the loop. |
| |
| It's a good practice to define dataflow_* parameters in the default_args of the dag |
| like the project, zone and staging location. |
| |
| .. seealso:: |
| https://cloud.google.com/dataflow/docs/reference/rest/v1b3/LaunchTemplateParameters |
| https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment |
| |
| .. code-block:: python |
| |
| default_args = { |
| "dataflow_default_options": { |
| "zone": "europe-west1-d", |
| "tempLocation": "gs://my-staging-bucket/staging/", |
| } |
| } |
| |
| You need to pass the path to your dataflow template as a file reference with the |
| ``template`` parameter. Use ``parameters`` to pass on parameters to your job. |
| Use ``environment`` to pass on runtime environment variables to your job. |
| |
| .. code-block:: python |
| |
| t1 = DataflowTemplatedJobStartOperator( |
| task_id="dataflow_example", |
| template="{{var.value.gcp_dataflow_base}}", |
| parameters={ |
| "inputFile": "gs://bucket/input/my_input.txt", |
| "outputFile": "gs://bucket/output/my_output.txt", |
| }, |
| gcp_conn_id="airflow-conn-id", |
| dag=my_dag, |
| ) |
| |
| ``template``, ``dataflow_default_options``, ``parameters``, and ``job_name`` are |
| templated so you can use variables in them. |
| |
| Note that ``dataflow_default_options`` is expected to save high-level options |
| for project information, which apply to all dataflow operators in the DAG. |
| |
| .. seealso:: |
| https://cloud.google.com/dataflow/docs/reference/rest/v1b3 |
| /LaunchTemplateParameters |
| https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment |
| For more detail on job template execution have a look at the reference: |
| https://cloud.google.com/dataflow/docs/templates/executing-templates |
| |
| .. py:attribute:: template_fields |
| :annotation: :Sequence[str] = ['template', 'job_name', 'options', 'parameters', 'project_id', 'location', 'gcp_conn_id',... |
| |
| |
| |
| .. py:attribute:: ui_color |
| :annotation: = #0273d4 |
| |
| |
| |
| .. py:attribute:: operator_extra_links |
| |
| |
| |
| |
| .. py:method:: execute(self, context) |
| |
| This is the main method to derive when creating an operator. |
| Context is the same dictionary used as when rendering jinja templates. |
| |
| Refer to get_template_context for more context. |
| |
| |
| .. py:method:: on_kill(self) |
| |
| Override this method to cleanup subprocesses when a task instance |
| gets killed. Any use of the threading, subprocess or multiprocessing |
| module within an operator needs to be cleaned up or it will leave |
| ghost processes behind. |
| |
| |
| |
| .. py:class:: DataflowStartFlexTemplateOperator(body, location, project_id = None, gcp_conn_id = 'google_cloud_default', delegate_to = None, drain_pipeline = False, cancel_timeout = 10 * 60, wait_until_finished = None, *args, **kwargs) |
| |
| Bases: :py:obj:`airflow.models.BaseOperator` |
| |
| Starts flex templates with the Dataflow pipeline. |
| |
| .. seealso:: |
| For more information on how to use this operator, take a look at the guide: |
| :ref:`howto/operator:DataflowStartFlexTemplateOperator` |
| |
| :param body: The request body. See: |
| https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch#request-body |
| :param location: The location of the Dataflow job (for example europe-west1) |
| :param project_id: The ID of the GCP project that owns the job. |
| If set to ``None`` or missing, the default project_id from the GCP connection is used. |
| :param gcp_conn_id: The connection ID to use connecting to Google Cloud |
| Platform. |
| :param delegate_to: The account to impersonate, if any. |
| For this to work, the service account making the request must have |
| domain-wide delegation enabled. |
| :param drain_pipeline: Optional, set to True if want to stop streaming job by draining it |
| instead of canceling during killing task instance. See: |
| https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline |
| :param cancel_timeout: How long (in seconds) operator should wait for the pipeline to be |
| successfully cancelled when task is being killed. |
| :param wait_until_finished: (Optional) |
| If True, wait for the end of pipeline execution before exiting. |
| If False, only submits job. |
| If None, default behavior. |
| |
| The default behavior depends on the type of pipeline: |
| |
| * for the streaming pipeline, wait for jobs to start, |
| * for the batch pipeline, wait for the jobs to complete. |
| |
| .. warning:: |
| |
| You cannot call ``PipelineResult.wait_until_finish`` method in your pipeline code for the operator |
| to work properly. i. e. you must use asynchronous execution. Otherwise, your pipeline will |
| always wait until finished. For more information, look at: |
| `Asynchronous execution |
| <https://cloud.google.com/dataflow/docs/guides/specifying-exec-params#python_10>`__ |
| |
| The process of starting the Dataflow job in Airflow consists of two steps: |
| |
| * running a subprocess and reading the stderr/stderr log for the job id. |
| * loop waiting for the end of the job ID from the previous step. |
| This loop checks the status of the job. |
| |
| Step two is started just after step one has finished, so if you have wait_until_finished in your |
| pipeline code, step two will not start until the process stops. When this process stops, |
| steps two will run, but it will only execute one iteration as the job will be in a terminal state. |
| |
| If you in your pipeline do not call the wait_for_pipeline method but pass wait_until_finish=True |
| to the operator, the second loop will wait for the job's terminal state. |
| |
| If you in your pipeline do not call the wait_for_pipeline method, and pass wait_until_finish=False |
| to the operator, the second loop will check once is job not in terminal state and exit the loop. |
| |
| .. py:attribute:: template_fields |
| :annotation: :Sequence[str] = ['body', 'location', 'project_id', 'gcp_conn_id'] |
| |
| |
| |
| .. py:attribute:: operator_extra_links |
| |
| |
| |
| |
| .. py:method:: execute(self, context) |
| |
| This is the main method to derive when creating an operator. |
| Context is the same dictionary used as when rendering jinja templates. |
| |
| Refer to get_template_context for more context. |
| |
| |
| .. py:method:: on_kill(self) |
| |
| Override this method to cleanup subprocesses when a task instance |
| gets killed. Any use of the threading, subprocess or multiprocessing |
| module within an operator needs to be cleaned up or it will leave |
| ghost processes behind. |
| |
| |
| |
| .. py:class:: DataflowStartSqlJobOperator(job_name, query, options, location = DEFAULT_DATAFLOW_LOCATION, project_id = None, gcp_conn_id = 'google_cloud_default', delegate_to = None, drain_pipeline = False, *args, **kwargs) |
| |
| Bases: :py:obj:`airflow.models.BaseOperator` |
| |
| Starts Dataflow SQL query. |
| |
| .. seealso:: |
| For more information on how to use this operator, take a look at the guide: |
| :ref:`howto/operator:DataflowStartSqlJobOperator` |
| |
| .. warning:: |
| This operator requires ``gcloud`` command (Google Cloud SDK) must be installed on the Airflow worker |
| <https://cloud.google.com/sdk/docs/install>`__ |
| |
| :param job_name: The unique name to assign to the Cloud Dataflow job. |
| :param query: The SQL query to execute. |
| :param options: Job parameters to be executed. It can be a dictionary with the following keys. |
| |
| For more information, look at: |
| `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query |
| <gcloud beta dataflow sql query>`__ |
| command reference |
| |
| :param location: The location of the Dataflow job (for example europe-west1) |
| :param project_id: The ID of the GCP project that owns the job. |
| If set to ``None`` or missing, the default project_id from the GCP connection is used. |
| :param gcp_conn_id: The connection ID to use connecting to Google Cloud |
| Platform. |
| :param delegate_to: The account to impersonate, if any. |
| For this to work, the service account making the request must have |
| domain-wide delegation enabled. |
| :param drain_pipeline: Optional, set to True if want to stop streaming job by draining it |
| instead of canceling during killing task instance. See: |
| https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline |
| |
| .. py:attribute:: template_fields |
| :annotation: :Sequence[str] = ['job_name', 'query', 'options', 'location', 'project_id', 'gcp_conn_id'] |
| |
| |
| |
| .. py:attribute:: template_fields_renderers |
| |
| |
| |
| |
| .. py:method:: execute(self, context) |
| |
| This is the main method to derive when creating an operator. |
| Context is the same dictionary used as when rendering jinja templates. |
| |
| Refer to get_template_context for more context. |
| |
| |
| .. py:method:: on_kill(self) |
| |
| Override this method to cleanup subprocesses when a task instance |
| gets killed. Any use of the threading, subprocess or multiprocessing |
| module within an operator needs to be cleaned up or it will leave |
| ghost processes behind. |
| |
| |
| |
| .. py:class:: DataflowCreatePythonJobOperator(*, py_file, job_name = '{{task.task_id}}', dataflow_default_options = None, options = None, py_interpreter = 'python3', py_options = None, py_requirements = None, py_system_site_packages = False, project_id = None, location = DEFAULT_DATAFLOW_LOCATION, gcp_conn_id = 'google_cloud_default', delegate_to = None, poll_sleep = 10, drain_pipeline = False, cancel_timeout = 10 * 60, wait_until_finished = None, **kwargs) |
| |
| Bases: :py:obj:`airflow.models.BaseOperator` |
| |
| Launching Cloud Dataflow jobs written in python. Note that both |
| dataflow_default_options and options will be merged to specify pipeline |
| execution parameter, and dataflow_default_options is expected to save |
| high-level options, for instances, project and zone information, which |
| apply to all dataflow operators in the DAG. |
| |
| This class is deprecated. |
| Please use `providers.apache.beam.operators.beam.BeamRunPythonPipelineOperator`. |
| |
| .. seealso:: |
| For more detail on job submission have a look at the reference: |
| https://cloud.google.com/dataflow/pipelines/specifying-exec-params |
| |
| .. seealso:: |
| For more information on how to use this operator, take a look at the guide: |
| :ref:`howto/operator:DataflowCreatePythonJobOperator` |
| |
| :param py_file: Reference to the python dataflow pipeline file.py, e.g., |
| /some/local/file/path/to/your/python/pipeline/file. (templated) |
| :param job_name: The 'job_name' to use when executing the Dataflow job |
| (templated). This ends up being set in the pipeline options, so any entry |
| with key ``'jobName'`` or ``'job_name'`` in ``options`` will be overwritten. |
| :param py_options: Additional python options, e.g., ["-m", "-v"]. |
| :param dataflow_default_options: Map of default job options. |
| :param options: Map of job specific options.The key must be a dictionary. |
| The value can contain different types: |
| |
| * If the value is None, the single option - ``--key`` (without value) will be added. |
| * If the value is False, this option will be skipped |
| * If the value is True, the single option - ``--key`` (without value) will be added. |
| * If the value is list, the many options will be added for each key. |
| If the value is ``['A', 'B']`` and the key is ``key`` then the ``--key=A --key=B`` options |
| will be left |
| * Other value types will be replaced with the Python textual representation. |
| |
| When defining labels (``labels`` option), you can also provide a dictionary. |
| :param py_interpreter: Python version of the beam pipeline. |
| If None, this defaults to the python3. |
| To track python versions supported by beam and related |
| issues check: https://issues.apache.org/jira/browse/BEAM-1251 |
| :param py_requirements: Additional python package(s) to install. |
| If a value is passed to this parameter, a new virtual environment has been created with |
| additional packages installed. |
| |
| You could also install the apache_beam package if it is not installed on your system or you want |
| to use a different version. |
| :param py_system_site_packages: Whether to include system_site_packages in your virtualenv. |
| See virtualenv documentation for more information. |
| |
| This option is only relevant if the ``py_requirements`` parameter is not None. |
| :param gcp_conn_id: The connection ID to use connecting to Google Cloud. |
| :param project_id: Optional, the Google Cloud project ID in which to start a job. |
| If set to None or missing, the default project_id from the Google Cloud connection is used. |
| :param location: Job location. |
| :param delegate_to: The account to impersonate using domain-wide delegation of authority, |
| if any. For this to work, the service account making the request must have |
| domain-wide delegation enabled. |
| :param poll_sleep: The time in seconds to sleep between polling Google |
| Cloud Platform for the dataflow job status while the job is in the |
| JOB_STATE_RUNNING state. |
| :param drain_pipeline: Optional, set to True if want to stop streaming job by draining it |
| instead of canceling during killing task instance. See: |
| https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline |
| :param cancel_timeout: How long (in seconds) operator should wait for the pipeline to be |
| successfully cancelled when task is being killed. |
| :param wait_until_finished: (Optional) |
| If True, wait for the end of pipeline execution before exiting. |
| If False, only submits job. |
| If None, default behavior. |
| |
| The default behavior depends on the type of pipeline: |
| |
| * for the streaming pipeline, wait for jobs to start, |
| * for the batch pipeline, wait for the jobs to complete. |
| |
| .. warning:: |
| |
| You cannot call ``PipelineResult.wait_until_finish`` method in your pipeline code for the operator |
| to work properly. i. e. you must use asynchronous execution. Otherwise, your pipeline will |
| always wait until finished. For more information, look at: |
| `Asynchronous execution |
| <https://cloud.google.com/dataflow/docs/guides/specifying-exec-params#python_10>`__ |
| |
| The process of starting the Dataflow job in Airflow consists of two steps: |
| |
| * running a subprocess and reading the stderr/stderr log for the job id. |
| * loop waiting for the end of the job ID from the previous step. |
| This loop checks the status of the job. |
| |
| Step two is started just after step one has finished, so if you have wait_until_finished in your |
| pipeline code, step two will not start until the process stops. When this process stops, |
| steps two will run, but it will only execute one iteration as the job will be in a terminal state. |
| |
| If you in your pipeline do not call the wait_for_pipeline method but pass wait_until_finish=True |
| to the operator, the second loop will wait for the job's terminal state. |
| |
| If you in your pipeline do not call the wait_for_pipeline method, and pass wait_until_finish=False |
| to the operator, the second loop will check once is job not in terminal state and exit the loop. |
| |
| .. py:attribute:: template_fields |
| :annotation: :Sequence[str] = ['options', 'dataflow_default_options', 'job_name', 'py_file'] |
| |
| |
| |
| .. py:method:: execute(self, context) |
| |
| Execute the python dataflow job. |
| |
| |
| .. py:method:: on_kill(self) |
| |
| Override this method to cleanup subprocesses when a task instance |
| gets killed. Any use of the threading, subprocess or multiprocessing |
| module within an operator needs to be cleaned up or it will leave |
| ghost processes behind. |
| |
| |
| |