blob: bbb13d976366c26dd5cc46d0ed8d466e63724e0b [file] [log] [blame]
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
=========
Amazon S3
=========
`Amazon Simple Storage Service (Amazon S3) <https://aws.amazon.com/s3/>`__ is storage for the internet.
You can use Amazon S3 to store and retrieve any amount of data at any time, from anywhere on the web.
Prerequisite Tasks
------------------
.. include:: _partials/prerequisite_tasks.rst
Operators
---------
.. _howto/operator:S3CreateBucketOperator:
Create an Amazon S3 bucket
==========================
To create an Amazon S3 bucket you can use
:class:`~airflow.providers.amazon.aws.operators.s3.S3CreateBucketOperator`.
.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_operator_s3_create_bucket]
:end-before: [END howto_operator_s3_create_bucket]
.. _howto/operator:S3DeleteBucketOperator:
Delete an Amazon S3 bucket
==========================
To delete an Amazon S3 bucket you can use
:class:`~airflow.providers.amazon.aws.operators.s3.S3DeleteBucketOperator`.
.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_operator_s3_delete_bucket]
:end-before: [END howto_operator_s3_delete_bucket]
.. _howto/operator:S3PutBucketTaggingOperator:
Set the tags for an Amazon S3 bucket
====================================
To set the tags for an Amazon S3 bucket you can use
:class:`~airflow.providers.amazon.aws.operators.s3.S3PutBucketTaggingOperator`.
.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_operator_s3_put_bucket_tagging]
:end-before: [END howto_operator_s3_put_bucket_tagging]
.. _howto/operator:S3GetBucketTaggingOperator:
Get the tag of an Amazon S3 bucket
==================================
To get the tag set associated with an Amazon S3 bucket you can use
:class:`~airflow.providers.amazon.aws.operators.s3.S3GetBucketTaggingOperator`.
.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_operator_s3_get_bucket_tagging]
:end-before: [END howto_operator_s3_get_bucket_tagging]
.. _howto/operator:S3DeleteBucketTaggingOperator:
Delete the tags of an Amazon S3 bucket
======================================
To delete the tags of an Amazon S3 bucket you can use
:class:`~airflow.providers.amazon.aws.operators.s3.S3DeleteBucketTaggingOperator`.
.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_operator_s3_delete_bucket_tagging]
:end-before: [END howto_operator_s3_delete_bucket_tagging]
.. _howto/operator:S3CreateObjectOperator:
Create an Amazon S3 object
==========================
To create a new (or replace) Amazon S3 object you can use
:class:`~airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator`.
.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_operator_s3_create_object]
:end-before: [END howto_operator_s3_create_object]
.. _howto/operator:S3CopyObjectOperator:
Copy an Amazon S3 object
========================
To copy an Amazon S3 object from one bucket to another you can use
:class:`~airflow.providers.amazon.aws.operators.s3.S3CopyObjectOperator`.
The Amazon S3 connection used here needs to have access to both source and destination bucket/key.
.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_operator_s3_copy_object]
:end-before: [END howto_operator_s3_copy_object]
.. _howto/operator:S3DeleteObjectsOperator:
Delete Amazon S3 objects
========================
To delete one or multiple Amazon S3 objects you can use
:class:`~airflow.providers.amazon.aws.operators.s3.S3DeleteObjectsOperator`.
.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_operator_s3_delete_objects]
:end-before: [END howto_operator_s3_delete_objects]
.. _howto/operator:S3FileTransformOperator:
Transform an Amazon S3 object
=============================
To transform the data from one Amazon S3 object and save it to another object you can use
:class:`~airflow.providers.amazon.aws.operators.s3.S3FileTransformOperator`.
You can also apply an optional [Amazon S3 Select expression](https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-glacier-select-sql-reference-select.html)
to select the data you want to retrieve from ``source_s3_key`` using ``select_expression``.
.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_operator_s3_file_transform]
:end-before: [END howto_operator_s3_file_transform]
.. _howto/operator:S3ListPrefixesOperator:
List Amazon S3 prefixes
=======================
To list all Amazon S3 prefixes within an Amazon S3 bucket you can use
:class:`~airflow.providers.amazon.aws.operators.s3.S3ListPrefixesOperator`.
See `here <https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-prefixes.html>`__
for more information about Amazon S3 prefixes.
.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_operator_s3_list_prefixes]
:end-before: [END howto_operator_s3_list_prefixes]
.. _howto/operator:S3ListOperator:
List Amazon S3 objects
======================
To list all Amazon S3 objects within an Amazon S3 bucket you can use
:class:`~airflow.providers.amazon.aws.operators.s3.S3ListOperator`.
You can specify a ``prefix`` to filter the objects whose name begins with such prefix.
.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_operator_s3_list]
:end-before: [END howto_operator_s3_list]
Sensors
-------
.. _howto/sensor:S3KeySensor:
Wait on an Amazon S3 key
========================
To wait for one or multiple keys to be present in an Amazon S3 bucket you can use
:class:`~airflow.providers.amazon.aws.sensors.s3.S3KeySensor`.
For each key, it calls
`head_object <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.head_object>`__
API (or `list_objects_v2 <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.list_objects_v2>`__
API if ``wildcard_match`` is ``True``) to check whether it is present or not.
Please keep in mind, especially when used to check a large volume of keys, that it makes one API call per key.
To check one file:
.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_s3_key_single_key]
:end-before: [END howto_sensor_s3_key_single_key]
To check multiple files:
.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_s3_key_multiple_keys]
:end-before: [END howto_sensor_s3_key_multiple_keys]
To check with an additional custom check you can define a function which receives a list of matched S3 object
attributes and returns a boolean:
- ``True``: a certain criteria is met
- ``False``: the criteria isn't met
This function is called for each key passed as parameter in ``bucket_key``.
The reason why the parameter of this function is a list of objects is when ``wildcard_match`` is ``True``,
multiple files can match one key. The list of matched S3 object attributes contain only the size and is this format:
.. code-block:: python
[{"Size": int}]
.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_s3_key_function_definition]
:end-before: [END howto_sensor_s3_key_function_definition]
.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_s3_key_function]
:end-before: [END howto_sensor_s3_key_function]
.. _howto/sensor:S3KeysUnchangedSensor:
Wait on Amazon S3 prefix changes
================================
To check for changes in the number of objects at a specific prefix in an Amazon S3 bucket and waits until
the inactivity period has passed with no increase in the number of objects you can use
:class:`~airflow.providers.amazon.aws.sensors.s3.S3KeysUnchangedSensor`.
Note, this sensor will not behave correctly in reschedule mode,
as the state of the listed objects in the Amazon S3 bucket will be lost between rescheduled invocations.
.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_s3_keys_unchanged]
:end-before: [END howto_sensor_s3_keys_unchanged]
Reference
---------
* `AWS boto3 library documentation for S3 <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html>`__