| :py:mod:`airflow.providers.google.cloud.operators.pubsub` |
| ========================================================= |
| |
| .. py:module:: airflow.providers.google.cloud.operators.pubsub |
| |
| .. autoapi-nested-parse:: |
| |
| This module contains Google PubSub operators. |
| |
| .. spelling:: |
| |
| MessageStoragePolicy |
| |
| |
| |
| Module Contents |
| --------------- |
| |
| Classes |
| ~~~~~~~ |
| |
| .. autoapisummary:: |
| |
| airflow.providers.google.cloud.operators.pubsub.PubSubCreateTopicOperator |
| airflow.providers.google.cloud.operators.pubsub.PubSubCreateSubscriptionOperator |
| airflow.providers.google.cloud.operators.pubsub.PubSubDeleteTopicOperator |
| airflow.providers.google.cloud.operators.pubsub.PubSubDeleteSubscriptionOperator |
| airflow.providers.google.cloud.operators.pubsub.PubSubPublishMessageOperator |
| airflow.providers.google.cloud.operators.pubsub.PubSubPullOperator |
| |
| |
| |
| |
| .. py:class:: PubSubCreateTopicOperator(*, topic, project_id = None, fail_if_exists = False, gcp_conn_id = 'google_cloud_default', delegate_to = None, labels = None, message_storage_policy = None, kms_key_name = None, retry = DEFAULT, timeout = None, metadata = (), impersonation_chain = None, **kwargs) |
| |
| Bases: :py:obj:`airflow.models.BaseOperator` |
| |
| Create a PubSub topic. |
| |
| .. seealso:: |
| For more information on how to use this operator, take a look at the guide: |
| :ref:`howto/operator:PubSubCreateTopicOperator` |
| |
| By default, if the topic already exists, this operator will |
| not cause the DAG to fail. :: |
| |
| with DAG('successful DAG') as dag: |
| ( |
| PubSubCreateTopicOperator(project_id='my-project', topic='my_new_topic') |
| >> PubSubCreateTopicOperator(project_id='my-project', topic='my_new_topic') |
| ) |
| |
| The operator can be configured to fail if the topic already exists. :: |
| |
| with DAG('failing DAG') as dag: |
| ( |
| PubSubCreateTopicOperator(project_id='my-project', topic='my_new_topic') |
| >> PubSubCreateTopicOperator( |
| project_id='my-project', |
| topic='my_new_topic', |
| fail_if_exists=True, |
| ) |
| ) |
| |
| Both ``project_id`` and ``topic`` are templated so you can use Jinja templating in their values. |
| |
| :param project_id: Optional, the Google Cloud project ID where the topic will be created. |
| If set to None or missing, the default project_id from the Google Cloud connection is used. |
| :param topic: the topic to create. Do not include the |
| full topic path. In other words, instead of |
| ``projects/{project}/topics/{topic}``, provide only |
| ``{topic}``. (templated) |
| :param gcp_conn_id: The connection ID to use connecting to |
| Google Cloud. |
| :param delegate_to: The account to impersonate using domain-wide delegation of authority, |
| if any. For this to work, the service account making the request must have |
| domain-wide delegation enabled. |
| :param labels: Client-assigned labels; see |
| https://cloud.google.com/pubsub/docs/labels |
| :param message_storage_policy: Policy constraining the set |
| of Google Cloud regions where messages published to |
| the topic may be stored. If not present, then no constraints |
| are in effect. |
| Union[Dict, google.cloud.pubsub_v1.types.MessageStoragePolicy] |
| :param kms_key_name: The resource name of the Cloud KMS CryptoKey |
| to be used to protect access to messages published on this topic. |
| The expected format is |
| ``projects/*/locations/*/keyRings/*/cryptoKeys/*``. |
| :param retry: (Optional) A retry object used to retry requests. |
| If None is specified, requests will not be retried. |
| :param timeout: (Optional) The amount of time, in seconds, to wait for the request |
| to complete. Note that if retry is specified, the timeout applies to each |
| individual attempt. |
| :param metadata: (Optional) Additional metadata that is provided to the method. |
| :param impersonation_chain: Optional service account to impersonate using short-term |
| credentials, or chained list of accounts required to get the access_token |
| of the last account in the list, which will be impersonated in the request. |
| If set as a string, the account must grant the originating account |
| the Service Account Token Creator IAM role. |
| If set as a sequence, the identities from the list must grant |
| Service Account Token Creator IAM role to the directly preceding identity, with first |
| account from the list granting this role to the originating account (templated). |
| |
| .. py:attribute:: template_fields |
| :annotation: :Sequence[str] = ['project_id', 'topic', 'impersonation_chain'] |
| |
| |
| |
| .. py:attribute:: ui_color |
| :annotation: = #0273d4 |
| |
| |
| |
| .. py:method:: execute(self, context) |
| |
| This is the main method to derive when creating an operator. |
| Context is the same dictionary used as when rendering jinja templates. |
| |
| Refer to get_template_context for more context. |
| |
| |
| |
| .. py:class:: PubSubCreateSubscriptionOperator(*, topic, project_id = None, subscription = None, subscription_project_id = None, ack_deadline_secs = 10, fail_if_exists = False, gcp_conn_id = 'google_cloud_default', delegate_to = None, push_config = None, retain_acked_messages = None, message_retention_duration = None, labels = None, enable_message_ordering = False, expiration_policy = None, filter_ = None, dead_letter_policy = None, retry_policy = None, retry = DEFAULT, timeout = None, metadata = (), impersonation_chain = None, **kwargs) |
| |
| Bases: :py:obj:`airflow.models.BaseOperator` |
| |
| Create a PubSub subscription. |
| |
| .. seealso:: |
| For more information on how to use this operator, take a look at the guide: |
| :ref:`howto/operator:PubSubCreateSubscriptionOperator` |
| |
| By default, the subscription will be created in ``project_id``. If |
| ``subscription_project_id`` is specified and the Google Cloud credentials allow, the |
| Subscription can be created in a different project from its topic. |
| |
| By default, if the subscription already exists, this operator will |
| not cause the DAG to fail. However, the topic must exist in the project. :: |
| |
| with DAG('successful DAG') as dag: |
| ( |
| PubSubCreateSubscriptionOperator( |
| project_id='my-project', |
| topic='my-topic', |
| subscription='my-subscription' |
| ) |
| >> PubSubCreateSubscriptionOperator( |
| project_id='my-project', |
| topic='my-topic', |
| subscription='my-subscription', |
| ) |
| ) |
| |
| The operator can be configured to fail if the subscription already exists. |
| :: |
| |
| with DAG('failing DAG') as dag: |
| ( |
| PubSubCreateSubscriptionOperator( |
| project_id='my-project', |
| topic='my-topic', |
| subscription='my-subscription', |
| ) |
| >> PubSubCreateSubscriptionOperator( |
| project_id='my-project', |
| topic='my-topic', |
| subscription='my-subscription', |
| fail_if_exists=True, |
| ) |
| ) |
| |
| Finally, subscription is not required. If not passed, the operator will |
| generated a universally unique identifier for the subscription's name. :: |
| |
| with DAG('DAG') as dag: |
| PubSubCreateSubscriptionOperator(project_id='my-project', topic='my-topic') |
| |
| ``project_id``, ``topic``, ``subscription``, ``subscription_project_id`` and |
| ``impersonation_chain`` are templated so you can use Jinja templating in their values. |
| |
| :param project_id: Optional, the Google Cloud project ID where the topic exists. |
| If set to None or missing, the default project_id from the Google Cloud connection is used. |
| :param topic: the topic to create. Do not include the |
| full topic path. In other words, instead of |
| ``projects/{project}/topics/{topic}``, provide only |
| ``{topic}``. (templated) |
| :param subscription: the Pub/Sub subscription name. If empty, a random |
| name will be generated using the uuid module |
| :param subscription_project_id: the Google Cloud project ID where the subscription |
| will be created. If empty, ``topic_project`` will be used. |
| :param ack_deadline_secs: Number of seconds that a subscriber has to |
| acknowledge each message pulled from the subscription |
| :param gcp_conn_id: The connection ID to use connecting to |
| Google Cloud. |
| :param delegate_to: The account to impersonate using domain-wide delegation of authority, |
| if any. For this to work, the service account making the request must have |
| domain-wide delegation enabled. |
| :param push_config: If push delivery is used with this subscription, |
| this field is used to configure it. An empty ``pushConfig`` signifies |
| that the subscriber will pull and ack messages using API methods. |
| :param retain_acked_messages: Indicates whether to retain acknowledged |
| messages. If true, then messages are not expunged from the subscription's |
| backlog, even if they are acknowledged, until they fall out of the |
| ``message_retention_duration`` window. This must be true if you would |
| like to Seek to a timestamp. |
| :param message_retention_duration: How long to retain unacknowledged messages |
| in the subscription's backlog, from the moment a message is published. If |
| ``retain_acked_messages`` is true, then this also configures the |
| retention of acknowledged messages, and thus configures how far back in |
| time a ``Seek`` can be done. Defaults to 7 days. Cannot be more than 7 |
| days or less than 10 minutes. |
| :param labels: Client-assigned labels; see |
| https://cloud.google.com/pubsub/docs/labels |
| :param enable_message_ordering: If true, messages published with the same |
| ordering_key in PubsubMessage will be delivered to the subscribers in the order |
| in which they are received by the Pub/Sub system. Otherwise, they may be |
| delivered in any order. |
| :param expiration_policy: A policy that specifies the conditions for this |
| subscription’s expiration. A subscription is considered active as long as any |
| connected subscriber is successfully consuming messages from the subscription or |
| is issuing operations on the subscription. If expiration_policy is not set, |
| a default policy with ttl of 31 days will be used. The minimum allowed value for |
| expiration_policy.ttl is 1 day. |
| :param filter_: An expression written in the Cloud Pub/Sub filter language. If |
| non-empty, then only PubsubMessages whose attributes field matches the filter are |
| delivered on this subscription. If empty, then no messages are filtered out. |
| :param dead_letter_policy: A policy that specifies the conditions for dead lettering |
| messages in this subscription. If dead_letter_policy is not set, dead lettering is |
| disabled. |
| :param retry_policy: A policy that specifies how Pub/Sub retries message delivery |
| for this subscription. If not set, the default retry policy is applied. This |
| generally implies that messages will be retried as soon as possible for healthy |
| subscribers. RetryPolicy will be triggered on NACKs or acknowledgement deadline |
| exceeded events for a given message. |
| :param retry: (Optional) A retry object used to retry requests. |
| If None is specified, requests will not be retried. |
| :param timeout: (Optional) The amount of time, in seconds, to wait for the request |
| to complete. Note that if retry is specified, the timeout applies to each |
| individual attempt. |
| :param metadata: (Optional) Additional metadata that is provided to the method. |
| :param impersonation_chain: Optional service account to impersonate using short-term |
| credentials, or chained list of accounts required to get the access_token |
| of the last account in the list, which will be impersonated in the request. |
| If set as a string, the account must grant the originating account |
| the Service Account Token Creator IAM role. |
| If set as a sequence, the identities from the list must grant |
| Service Account Token Creator IAM role to the directly preceding identity, with first |
| account from the list granting this role to the originating account (templated). |
| |
| .. py:attribute:: template_fields |
| :annotation: :Sequence[str] = ['project_id', 'topic', 'subscription', 'subscription_project_id', 'impersonation_chain'] |
| |
| |
| |
| .. py:attribute:: ui_color |
| :annotation: = #0273d4 |
| |
| |
| |
| .. py:method:: execute(self, context) |
| |
| This is the main method to derive when creating an operator. |
| Context is the same dictionary used as when rendering jinja templates. |
| |
| Refer to get_template_context for more context. |
| |
| |
| |
| .. py:class:: PubSubDeleteTopicOperator(*, topic, project_id = None, fail_if_not_exists = False, gcp_conn_id = 'google_cloud_default', delegate_to = None, retry = DEFAULT, timeout = None, metadata = (), impersonation_chain = None, **kwargs) |
| |
| Bases: :py:obj:`airflow.models.BaseOperator` |
| |
| Delete a PubSub topic. |
| |
| .. seealso:: |
| For more information on how to use this operator, take a look at the guide: |
| :ref:`howto/operator:PubSubDeleteTopicOperator` |
| |
| By default, if the topic does not exist, this operator will |
| not cause the DAG to fail. :: |
| |
| with DAG('successful DAG') as dag: |
| PubSubDeleteTopicOperator(project_id='my-project', topic='non_existing_topic') |
| |
| The operator can be configured to fail if the topic does not exist. :: |
| |
| with DAG('failing DAG') as dag: |
| PubSubDeleteTopicOperator( |
| project_id='my-project', topic='non_existing_topic', fail_if_not_exists=True, |
| ) |
| |
| Both ``project_id`` and ``topic`` are templated so you can use Jinja templating in their values. |
| |
| :param project_id: Optional, the Google Cloud project ID in which to work (templated). |
| If set to None or missing, the default project_id from the Google Cloud connection is used. |
| :param topic: the topic to delete. Do not include the |
| full topic path. In other words, instead of |
| ``projects/{project}/topics/{topic}``, provide only |
| ``{topic}``. (templated) |
| :param fail_if_not_exists: If True and the topic does not exist, fail |
| the task |
| :param gcp_conn_id: The connection ID to use connecting to |
| Google Cloud. |
| :param delegate_to: The account to impersonate using domain-wide delegation of authority, |
| if any. For this to work, the service account making the request must have |
| domain-wide delegation enabled. |
| :param retry: (Optional) A retry object used to retry requests. |
| If None is specified, requests will not be retried. |
| :param timeout: (Optional) The amount of time, in seconds, to wait for the request |
| to complete. Note that if retry is specified, the timeout applies to each |
| individual attempt. |
| :param metadata: (Optional) Additional metadata that is provided to the method. |
| :param impersonation_chain: Optional service account to impersonate using short-term |
| credentials, or chained list of accounts required to get the access_token |
| of the last account in the list, which will be impersonated in the request. |
| If set as a string, the account must grant the originating account |
| the Service Account Token Creator IAM role. |
| If set as a sequence, the identities from the list must grant |
| Service Account Token Creator IAM role to the directly preceding identity, with first |
| account from the list granting this role to the originating account (templated). |
| |
| .. py:attribute:: template_fields |
| :annotation: :Sequence[str] = ['project_id', 'topic', 'impersonation_chain'] |
| |
| |
| |
| .. py:attribute:: ui_color |
| :annotation: = #cb4335 |
| |
| |
| |
| .. py:method:: execute(self, context) |
| |
| This is the main method to derive when creating an operator. |
| Context is the same dictionary used as when rendering jinja templates. |
| |
| Refer to get_template_context for more context. |
| |
| |
| |
| .. py:class:: PubSubDeleteSubscriptionOperator(*, subscription, project_id = None, fail_if_not_exists = False, gcp_conn_id = 'google_cloud_default', delegate_to = None, retry = DEFAULT, timeout = None, metadata = (), impersonation_chain = None, **kwargs) |
| |
| Bases: :py:obj:`airflow.models.BaseOperator` |
| |
| Delete a PubSub subscription. |
| |
| .. seealso:: |
| For more information on how to use this operator, take a look at the guide: |
| :ref:`howto/operator:PubSubDeleteSubscriptionOperator` |
| |
| By default, if the subscription does not exist, this operator will |
| not cause the DAG to fail. :: |
| |
| with DAG('successful DAG') as dag: |
| PubSubDeleteSubscriptionOperator(project_id='my-project', subscription='non-existing') |
| |
| The operator can be configured to fail if the subscription already exists. |
| |
| :: |
| |
| with DAG('failing DAG') as dag: |
| PubSubDeleteSubscriptionOperator( |
| project_id='my-project', subscription='non-existing', fail_if_not_exists=True, |
| ) |
| |
| ``project_id``, and ``subscription`` are templated so you can use Jinja templating in their values. |
| |
| :param project_id: Optional, the Google Cloud project ID in which to work (templated). |
| If set to None or missing, the default project_id from the Google Cloud connection is used. |
| :param subscription: the subscription to delete. Do not include the |
| full subscription path. In other words, instead of |
| ``projects/{project}/subscription/{subscription}``, provide only |
| ``{subscription}``. (templated) |
| :param fail_if_not_exists: If True and the subscription does not exist, |
| fail the task |
| :param gcp_conn_id: The connection ID to use connecting to |
| Google Cloud. |
| :param delegate_to: The account to impersonate using domain-wide delegation of authority, |
| if any. For this to work, the service account making the request must have |
| domain-wide delegation enabled. |
| :param retry: (Optional) A retry object used to retry requests. |
| If None is specified, requests will not be retried. |
| :param timeout: (Optional) The amount of time, in seconds, to wait for the request |
| to complete. Note that if retry is specified, the timeout applies to each |
| individual attempt. |
| :param metadata: (Optional) Additional metadata that is provided to the method. |
| :param impersonation_chain: Optional service account to impersonate using short-term |
| credentials, or chained list of accounts required to get the access_token |
| of the last account in the list, which will be impersonated in the request. |
| If set as a string, the account must grant the originating account |
| the Service Account Token Creator IAM role. |
| If set as a sequence, the identities from the list must grant |
| Service Account Token Creator IAM role to the directly preceding identity, with first |
| account from the list granting this role to the originating account (templated). |
| |
| .. py:attribute:: template_fields |
| :annotation: :Sequence[str] = ['project_id', 'subscription', 'impersonation_chain'] |
| |
| |
| |
| .. py:attribute:: ui_color |
| :annotation: = #cb4335 |
| |
| |
| |
| .. py:method:: execute(self, context) |
| |
| This is the main method to derive when creating an operator. |
| Context is the same dictionary used as when rendering jinja templates. |
| |
| Refer to get_template_context for more context. |
| |
| |
| |
| .. py:class:: PubSubPublishMessageOperator(*, topic, messages, project_id = None, gcp_conn_id = 'google_cloud_default', delegate_to = None, impersonation_chain = None, **kwargs) |
| |
| Bases: :py:obj:`airflow.models.BaseOperator` |
| |
| Publish messages to a PubSub topic. |
| |
| .. seealso:: |
| For more information on how to use this operator, take a look at the guide: |
| :ref:`howto/operator:PubSubPublishMessageOperator` |
| |
| Each Task publishes all provided messages to the same topic |
| in a single Google Cloud project. If the topic does not exist, this |
| task will fail. :: |
| |
| m1 = {'data': b'Hello, World!', |
| 'attributes': {'type': 'greeting'} |
| } |
| m2 = {'data': b'Knock, knock'} |
| m3 = {'attributes': {'foo': ''}} |
| |
| t1 = PubSubPublishMessageOperator( |
| project_id='my-project', |
| topic='my_topic', |
| messages=[m1, m2, m3], |
| create_topic=True, |
| dag=dag, |
| ) |
| |
| ``project_id``, ``topic``, and ``messages`` are templated so you can use Jinja templating |
| in their values. |
| |
| :param project_id: Optional, the Google Cloud project ID in which to work (templated). |
| If set to None or missing, the default project_id from the Google Cloud connection is used. |
| :param topic: the topic to which to publish. Do not include the |
| full topic path. In other words, instead of |
| ``projects/{project}/topics/{topic}``, provide only |
| ``{topic}``. (templated) |
| :param messages: a list of messages to be published to the |
| topic. Each message is a dict with one or more of the |
| following keys-value mappings: |
| * 'data': a bytestring (utf-8 encoded) |
| * 'attributes': {'key1': 'value1', ...} |
| Each message must contain at least a non-empty 'data' value |
| or an attribute dict with at least one key (templated). See |
| https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage |
| :param gcp_conn_id: The connection ID to use connecting to |
| Google Cloud. |
| :param delegate_to: The account to impersonate using domain-wide delegation of authority, |
| if any. For this to work, the service account making the request must have |
| domain-wide delegation enabled. |
| :param impersonation_chain: Optional service account to impersonate using short-term |
| credentials, or chained list of accounts required to get the access_token |
| of the last account in the list, which will be impersonated in the request. |
| If set as a string, the account must grant the originating account |
| the Service Account Token Creator IAM role. |
| If set as a sequence, the identities from the list must grant |
| Service Account Token Creator IAM role to the directly preceding identity, with first |
| account from the list granting this role to the originating account (templated). |
| |
| .. py:attribute:: template_fields |
| :annotation: :Sequence[str] = ['project_id', 'topic', 'messages', 'impersonation_chain'] |
| |
| |
| |
| .. py:attribute:: ui_color |
| :annotation: = #0273d4 |
| |
| |
| |
| .. py:method:: execute(self, context) |
| |
| This is the main method to derive when creating an operator. |
| Context is the same dictionary used as when rendering jinja templates. |
| |
| Refer to get_template_context for more context. |
| |
| |
| |
| .. py:class:: PubSubPullOperator(*, project_id, subscription, max_messages = 5, ack_messages = False, messages_callback = None, gcp_conn_id = 'google_cloud_default', delegate_to = None, impersonation_chain = None, **kwargs) |
| |
| Bases: :py:obj:`airflow.models.BaseOperator` |
| |
| Pulls messages from a PubSub subscription and passes them through XCom. |
| If the queue is empty, returns empty list - never waits for messages. |
| If you do need to wait, please use :class:`airflow.providers.google.cloud.sensors.PubSubPullSensor` |
| instead. |
| |
| .. seealso:: |
| For more information on how to use this operator and the PubSubPullSensor, take a look at the guide: |
| :ref:`howto/operator:PubSubPullSensor` |
| |
| This operator will pull up to ``max_messages`` messages from the |
| specified PubSub subscription. When the subscription returns messages, |
| the messages will be returned immediately from the operator and passed through XCom for downstream tasks. |
| |
| If ``ack_messages`` is set to True, messages will be immediately |
| acknowledged before being returned, otherwise, downstream tasks will be |
| responsible for acknowledging them. |
| |
| ``project_id `` and ``subscription`` are templated so you can use Jinja templating in their values. |
| |
| :param project_id: the Google Cloud project ID for the subscription (templated) |
| :param subscription: the Pub/Sub subscription name. Do not include the |
| full subscription path. |
| :param max_messages: The maximum number of messages to retrieve per |
| PubSub pull request |
| :param ack_messages: If True, each message will be acknowledged |
| immediately rather than by any downstream tasks |
| :param gcp_conn_id: The connection ID to use connecting to |
| Google Cloud. |
| :param delegate_to: The account to impersonate using domain-wide delegation of authority, |
| if any. For this to work, the service account making the request must have |
| domain-wide delegation enabled. |
| :param messages_callback: (Optional) Callback to process received messages. |
| It's return value will be saved to XCom. |
| If you are pulling large messages, you probably want to provide a custom callback. |
| If not provided, the default implementation will convert `ReceivedMessage` objects |
| into JSON-serializable dicts using `google.protobuf.json_format.MessageToDict` function. |
| :param impersonation_chain: Optional service account to impersonate using short-term |
| credentials, or chained list of accounts required to get the access_token |
| of the last account in the list, which will be impersonated in the request. |
| If set as a string, the account must grant the originating account |
| the Service Account Token Creator IAM role. |
| If set as a sequence, the identities from the list must grant |
| Service Account Token Creator IAM role to the directly preceding identity, with first |
| account from the list granting this role to the originating account (templated). |
| |
| .. py:attribute:: template_fields |
| :annotation: :Sequence[str] = ['project_id', 'subscription', 'impersonation_chain'] |
| |
| |
| |
| .. py:method:: execute(self, context) |
| |
| This is the main method to derive when creating an operator. |
| Context is the same dictionary used as when rendering jinja templates. |
| |
| Refer to get_template_context for more context. |
| |
| |
| |