blob: 60e6f319b2c96de329df60fa2ec31ef0fa8b7575 [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Unit tests for the S3 File System"""
# pytype: skip-file
import logging
import unittest
import mock
from apache_beam.io.aws.clients.s3 import messages
from apache_beam.io.filesystem import BeamIOError
from apache_beam.io.filesystem import FileMetadata
from apache_beam.options.pipeline_options import PipelineOptions
# Protect against environments where boto3 library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
try:
from apache_beam.io.aws import s3filesystem
except ImportError:
s3filesystem = None # type: ignore[assignment]
# pylint: enable=wrong-import-order, wrong-import-position
@unittest.skipIf(s3filesystem is None, 'AWS dependencies are not installed')
class S3FileSystemTest(unittest.TestCase):
def setUp(self):
pipeline_options = PipelineOptions()
self.fs = s3filesystem.S3FileSystem(pipeline_options=pipeline_options)
def test_scheme(self):
self.assertEqual(self.fs.scheme(), 's3')
self.assertEqual(s3filesystem.S3FileSystem.scheme(), 's3')
def test_join(self):
self.assertEqual(
's3://bucket/path/to/file',
self.fs.join('s3://bucket/path', 'to', 'file'))
self.assertEqual(
's3://bucket/path/to/file', self.fs.join('s3://bucket/path', 'to/file'))
self.assertEqual(
's3://bucket/path/to/file',
self.fs.join('s3://bucket/path', '/to/file'))
self.assertEqual(
's3://bucket/path/to/file',
self.fs.join('s3://bucket/path/', 'to', 'file'))
self.assertEqual(
's3://bucket/path/to/file',
self.fs.join('s3://bucket/path/', 'to/file'))
self.assertEqual(
's3://bucket/path/to/file',
self.fs.join('s3://bucket/path/', '/to/file'))
with self.assertRaises(ValueError):
self.fs.join('/bucket/path/', '/to/file')
def test_split(self):
self.assertEqual(('s3://foo/bar', 'baz'), self.fs.split('s3://foo/bar/baz'))
self.assertEqual(('s3://foo', ''), self.fs.split('s3://foo/'))
self.assertEqual(('s3://foo', ''), self.fs.split('s3://foo'))
with self.assertRaises(ValueError):
self.fs.split('/no/s3/prefix')
@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
def test_match_single(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
s3io_mock._status.return_value = {'size': 1, 'last_updated': 9999999.0}
expected_results = [FileMetadata('s3://bucket/file1', 1, 9999999.0)]
match_result = self.fs.match(['s3://bucket/file1'])[0]
self.assertEqual(match_result.metadata_list, expected_results)
s3io_mock._status.assert_called_once_with('s3://bucket/file1')
@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
def test_match_multiples(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
s3io_mock.list_files.return_value = iter([
('s3://bucket/file1', (1, 9999999.0)),
('s3://bucket/file2', (2, 8888888.0))
])
expected_results = set([
FileMetadata('s3://bucket/file1', 1, 9999999.0),
FileMetadata('s3://bucket/file2', 2, 8888888.0)
])
match_result = self.fs.match(['s3://bucket/'])[0]
self.assertEqual(set(match_result.metadata_list), expected_results)
s3io_mock.list_files.assert_called_once_with(
's3://bucket/', with_metadata=True)
@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
def test_match_multiples_limit(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
limit = 1
s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
s3io_mock.list_files.return_value = iter([
('s3://bucket/file1', (1, 99999.0))
])
expected_results = set([FileMetadata('s3://bucket/file1', 1, 99999.0)])
match_result = self.fs.match(['s3://bucket/'], [limit])[0]
self.assertEqual(set(match_result.metadata_list), expected_results)
self.assertEqual(len(match_result.metadata_list), limit)
s3io_mock.list_files.assert_called_once_with(
's3://bucket/', with_metadata=True)
@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
def test_match_multiples_error(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
exception = IOError('Failed')
s3io_mock.list_files.side_effect = exception
with self.assertRaises(BeamIOError) as error:
self.fs.match(['s3://bucket/'])
self.assertIn('Match operation failed', str(error.exception))
s3io_mock.list_files.assert_called_once_with(
's3://bucket/', with_metadata=True)
@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
def test_match_multiple_patterns(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
s3io_mock.list_files.side_effect = [
iter([('s3://bucket/file1', (1, 99999.0))]),
iter([('s3://bucket/file2', (2, 88888.0))]),
]
expected_results = [[FileMetadata('s3://bucket/file1', 1, 99999.0)],
[FileMetadata('s3://bucket/file2', 2, 88888.0)]]
result = self.fs.match(['s3://bucket/file1*', 's3://bucket/file2*'])
self.assertEqual([mr.metadata_list for mr in result], expected_results)
@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
def test_create(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
# Issue file copy
_ = self.fs.create('s3://bucket/from1', 'application/octet-stream')
s3io_mock.open.assert_called_once_with(
's3://bucket/from1', 'wb', mime_type='application/octet-stream')
@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
def test_open(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
# Issue file copy
_ = self.fs.open('s3://bucket/from1', 'application/octet-stream')
s3io_mock.open.assert_called_once_with(
's3://bucket/from1', 'rb', mime_type='application/octet-stream')
@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
def test_copy_file(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
sources = ['s3://bucket/from1', 's3://bucket/from2']
destinations = ['s3://bucket/to1', 's3://bucket/to2']
# Issue file copy
self.fs.copy(sources, destinations)
src_dest_pairs = list(zip(sources, destinations))
s3io_mock.copy_paths.assert_called_once_with(src_dest_pairs)
@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
def test_copy_file_error(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
sources = ['s3://bucket/from1', 's3://bucket/from2', 's3://bucket/from3']
destinations = ['s3://bucket/to1', 's3://bucket/to2']
# Issue file copy
with self.assertRaises(BeamIOError):
self.fs.copy(sources, destinations)
@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
def test_delete(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
s3io_mock.size.return_value = 0
files = [
's3://bucket/from1',
's3://bucket/from2',
's3://bucket/from3',
]
# Issue batch delete.
self.fs.delete(files)
s3io_mock.delete_paths.assert_called_once_with(files)
@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
def test_delete_error(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
problematic_directory = 's3://nonexistent-bucket/tree/'
exception = messages.S3ClientError('Not found', 404)
s3io_mock.delete_paths.return_value = {
problematic_directory: exception,
's3://bucket/object1': None,
's3://bucket/object2': None,
}
s3io_mock.size.return_value = 0
files = [
problematic_directory,
's3://bucket/object1',
's3://bucket/object2',
]
expected_results = {problematic_directory: exception}
# Issue batch delete.
with self.assertRaises(BeamIOError) as error:
self.fs.delete(files)
self.assertIn('Delete operation failed', str(error.exception))
self.assertEqual(error.exception.exception_details, expected_results)
s3io_mock.delete_paths.assert_called()
@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
def test_rename(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
sources = ['s3://bucket/from1', 's3://bucket/from2']
destinations = ['s3://bucket/to1', 's3://bucket/to2']
# Issue file copy
self.fs.rename(sources, destinations)
src_dest_pairs = list(zip(sources, destinations))
s3io_mock.rename_files.assert_called_once_with(src_dest_pairs)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()