| :mod:`airflow.contrib.operators.dynamodb_to_s3` |
| =============================================== |
| |
| .. py:module:: airflow.contrib.operators.dynamodb_to_s3 |
| |
| .. autoapi-nested-parse:: |
| |
| This module contains operators to replicate records from |
| DynamoDB table to S3. |
| |
| |
| |
| Module Contents |
| --------------- |
| |
| .. function:: _convert_item_to_json_bytes(item) |
| |
| .. function:: _upload_file_to_s3(file_obj, bucket_name, s3_key_prefix) |
| |
| .. py:class:: DynamoDBToS3Operator(dynamodb_table_name, s3_bucket_name, file_size, dynamodb_scan_kwargs=None, s3_key_prefix='', process_func=_convert_item_to_json_bytes, *args, **kwargs) |
| |
| Bases: :class:`airflow.models.baseoperator.BaseOperator` |
| |
| Replicates records from a DynamoDB table to S3. |
| It scans a DynamoDB table and write the received records to a file |
| on the local filesystem. It flushes the file to S3 once the file size |
| exceeds the file size limit specified by the user. |
| |
| Users can also specify a filtering criteria using dynamodb_scan_kwargs |
| to only replicate records that satisfy the criteria. |
| |
| To parallelize the replication, users can create multiple tasks of DynamoDBToS3Operator. |
| For instance to replicate with parallelism of 2, create two tasks like: |
| |
| .. code-block:: |
| |
| op1 = DynamoDBToS3Operator( |
| task_id='replicator-1', |
| dynamodb_table_name='hello', |
| dynamodb_scan_kwargs={ |
| 'TotalSegments': 2, |
| 'Segment': 0, |
| }, |
| ... |
| ) |
| |
| op2 = DynamoDBToS3Operator( |
| task_id='replicator-2', |
| dynamodb_table_name='hello', |
| dynamodb_scan_kwargs={ |
| 'TotalSegments': 2, |
| 'Segment': 1, |
| }, |
| ... |
| ) |
| |
| :param dynamodb_table_name: Dynamodb table to replicate data from |
| :param s3_bucket_name: S3 bucket to replicate data to |
| :param file_size: Flush file to s3 if file size >= file_size |
| :param dynamodb_scan_kwargs: kwargs pass to <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan> # noqa: E501 pylint: disable=line-too-long |
| :param s3_key_prefix: Prefix of s3 object key |
| :param process_func: How we transforms a dynamodb item to bytes. By default we dump the json |
| |
| |
| .. method:: execute(self, context) |
| |
| |
| |
| |
| .. method:: _scan_dynamodb_and_upload_to_s3(self, temp_file, scan_kwargs, table) |
| |
| |
| |
| |