blob: faff2631f13429a43b28dd7e929ef4b76d1f8bde [file] [log] [blame]
:mod:`airflow.contrib.operators.dynamodb_to_s3`
===============================================
.. py:module:: airflow.contrib.operators.dynamodb_to_s3
.. autoapi-nested-parse::
This module contains operators to replicate records from
DynamoDB table to S3.
Module Contents
---------------
.. function:: _convert_item_to_json_bytes(item)
.. function:: _upload_file_to_s3(file_obj, bucket_name, s3_key_prefix)
.. py:class:: DynamoDBToS3Operator(dynamodb_table_name, s3_bucket_name, file_size, dynamodb_scan_kwargs=None, s3_key_prefix='', process_func=_convert_item_to_json_bytes, *args, **kwargs)
Bases: :class:`airflow.models.baseoperator.BaseOperator`
Replicates records from a DynamoDB table to S3.
It scans a DynamoDB table and write the received records to a file
on the local filesystem. It flushes the file to S3 once the file size
exceeds the file size limit specified by the user.
Users can also specify a filtering criteria using dynamodb_scan_kwargs
to only replicate records that satisfy the criteria.
To parallelize the replication, users can create multiple tasks of DynamoDBToS3Operator.
For instance to replicate with parallelism of 2, create two tasks like:
.. code-block::
op1 = DynamoDBToS3Operator(
task_id='replicator-1',
dynamodb_table_name='hello',
dynamodb_scan_kwargs={
'TotalSegments': 2,
'Segment': 0,
},
...
)
op2 = DynamoDBToS3Operator(
task_id='replicator-2',
dynamodb_table_name='hello',
dynamodb_scan_kwargs={
'TotalSegments': 2,
'Segment': 1,
},
...
)
:param dynamodb_table_name: Dynamodb table to replicate data from
:param s3_bucket_name: S3 bucket to replicate data to
:param file_size: Flush file to s3 if file size >= file_size
:param dynamodb_scan_kwargs: kwargs pass to <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan> # noqa: E501 pylint: disable=line-too-long
:param s3_key_prefix: Prefix of s3 object key
:param process_func: How we transforms a dynamodb item to bytes. By default we dump the json
.. method:: execute(self, context)
.. method:: _scan_dynamodb_and_upload_to_s3(self, temp_file, scan_kwargs, table)