blob: fd5411cce0c8d5307c21cfbd468f882d6794e8ca [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Tests for S3 client."""
# pytype: skip-file
from __future__ import absolute_import
import logging
import os
import random
import time
import unittest
from apache_beam.io.aws import s3io
from apache_beam.io.aws.clients.s3 import fake_client
from apache_beam.io.aws.clients.s3 import messages
class TestS3PathParser(unittest.TestCase):
BAD_S3_PATHS = [
's3://',
's3://bucket',
's3:///name',
's3:///',
's3:/blah/bucket/name',
]
def test_s3_path(self):
self.assertEqual(
s3io.parse_s3_path('s3://bucket/name'), ('bucket', 'name'))
self.assertEqual(
s3io.parse_s3_path('s3://bucket/name/sub'), ('bucket', 'name/sub'))
def test_bad_s3_path(self):
for path in self.BAD_S3_PATHS:
self.assertRaises(ValueError, s3io.parse_s3_path, path)
self.assertRaises(ValueError, s3io.parse_s3_path, 's3://bucket/')
def test_s3_path_object_optional(self):
self.assertEqual(
s3io.parse_s3_path('s3://bucket/name', object_optional=True),
('bucket', 'name'))
self.assertEqual(
s3io.parse_s3_path('s3://bucket/', object_optional=True),
('bucket', ''))
def test_bad_s3_path_object_optional(self):
for path in self.BAD_S3_PATHS:
self.assertRaises(ValueError, s3io.parse_s3_path, path, True)
class TestS3IO(unittest.TestCase):
def _insert_random_file(self, client, path, size):
bucket, name = s3io.parse_s3_path(path)
contents = os.urandom(size)
fakeFile = fake_client.FakeFile(bucket, name, contents)
if self.USE_MOCK:
self.client.add_file(fakeFile)
else:
f = self.aws.open(path, 'w')
f.write(contents)
f.close()
return fakeFile
def setUp(self):
# These tests can be run locally against a mock S3 client, or as integration
# tests against the real S3 client.
self.USE_MOCK = True
# If you're running integration tests with S3, set this variable to be an
# s3 path that you have access to where test data can be written. If you're
# just running tests against the mock, this can be any s3 path. It should
# end with a '/'.
self.TEST_DATA_PATH = 's3://random-data-sets/beam_tests/'
if self.USE_MOCK:
self.client = fake_client.FakeS3Client()
test_data_bucket, _ = s3io.parse_s3_path(self.TEST_DATA_PATH)
self.client.known_buckets.add(test_data_bucket)
self.aws = s3io.S3IO(self.client)
else:
self.aws = s3io.S3IO()
self.client = self.aws.client
def test_size(self):
file_name = self.TEST_DATA_PATH + 'dummy_file'
file_size = 1234
self._insert_random_file(self.client, file_name, file_size)
self.assertTrue(self.aws.exists(file_name))
self.assertEqual(1234, self.aws.size(file_name))
# Clean up
self.aws.delete(file_name)
def test_last_updated(self):
file_name = self.TEST_DATA_PATH + 'dummy_file'
file_size = 1234
self._insert_random_file(self.client, file_name, file_size)
self.assertTrue(self.aws.exists(file_name))
tolerance = 5 * 60 # 5 mins
low_bound, high_bound = time.time() - tolerance, time.time() + tolerance
result = self.aws.last_updated(file_name)
self.assertTrue(low_bound <= result <= high_bound)
# Clean up
self.aws.delete(file_name)
def test_checksum(self):
file_name = self.TEST_DATA_PATH + 'checksum'
file_size = 1024
file_ = self._insert_random_file(self.client, file_name, file_size)
original_etag = self.aws.checksum(file_name)
self.aws.delete(file_name)
with self.aws.open(file_name, 'w') as f:
f.write(file_.contents)
rewritten_etag = self.aws.checksum(file_name)
self.assertEqual(original_etag, rewritten_etag)
self.assertEqual(len(original_etag), 36)
self.assertTrue(original_etag.endswith('-1"'))
# Clean up
self.aws.delete(file_name)
def test_copy(self):
src_file_name = self.TEST_DATA_PATH + 'source'
dest_file_name = self.TEST_DATA_PATH + 'dest'
file_size = 1024
self._insert_random_file(self.client, src_file_name, file_size)
self.assertTrue(src_file_name in
self.aws.list_prefix(self.TEST_DATA_PATH))
self.assertFalse(dest_file_name in
self.aws.list_prefix(self.TEST_DATA_PATH))
self.aws.copy(src_file_name, dest_file_name)
self.assertTrue(src_file_name in
self.aws.list_prefix(self.TEST_DATA_PATH))
self.assertTrue(dest_file_name in
self.aws.list_prefix(self.TEST_DATA_PATH))
# Clean up
self.aws.delete_files([src_file_name, dest_file_name])
# Test copy of non-existent files.
with self.assertRaises(messages.S3ClientError) as err:
self.aws.copy(self.TEST_DATA_PATH + 'non-existent',
self.TEST_DATA_PATH + 'non-existent-destination')
self.assertTrue('Not Found' in err.exception.message)
def test_copy_paths(self):
from_name_pattern = self.TEST_DATA_PATH + 'copy_me_%d'
to_name_pattern = self.TEST_DATA_PATH + 'destination_%d'
file_size = 1024
num_files = 10
src_dest_pairs = [(from_name_pattern % i, to_name_pattern % i)
for i in range(num_files)]
result = self.aws.copy_paths(src_dest_pairs)
self.assertTrue(result)
for i, (src, dest, exception) in enumerate(result):
self.assertEqual(src, from_name_pattern % i)
self.assertEqual(dest, to_name_pattern % i)
self.assertTrue(isinstance(exception, messages.S3ClientError))
self.assertEqual(exception.code, 404)
self.assertFalse(self.aws.exists(from_name_pattern % i))
self.assertFalse(self.aws.exists(to_name_pattern % i))
# Insert some files.
for i in range(num_files):
self._insert_random_file(self.client, from_name_pattern % i, file_size)
# Check files inserted properly.
for i in range(num_files):
self.assertTrue(self.aws.exists(from_name_pattern % i))
# Execute batch copy.
result = self.aws.copy_paths(src_dest_pairs)
# Check files copied properly.
for i in range(num_files):
self.assertTrue(self.aws.exists(from_name_pattern % i))
self.assertTrue(self.aws.exists(to_name_pattern % i))
# Check results
for i, (src, dest, exception) in enumerate(result):
self.assertEqual(src_dest_pairs[i], (src, dest))
self.assertEqual(exception, None)
# Clean up
all_files = set().union(*[set(pair) for pair in src_dest_pairs])
self.aws.delete_files(all_files)
def test_copy_paths_error(self):
n_real_files = 3
# Create some files
from_path = self.TEST_DATA_PATH + 'copy_paths/'
files = [from_path + '%d' % i for i in range(n_real_files)]
to_path = self.TEST_DATA_PATH + 'destination/'
destinations = [to_path + '%d' % i for i in range(n_real_files)]
for file_ in files: self._insert_random_file(self.client, file_, 1024)
# Add nonexistent files to the sources and destinations
sources = files + [
from_path + 'X',
from_path + 'fake_directory_1/',
from_path + 'fake_directory_2/'
]
destinations += [
to_path + 'X',
to_path + 'fake_directory_1/',
to_path + 'fake_directory_2'
]
result = self.aws.copy_paths(list(zip(sources, destinations)))
# The copy_paths function of class S3IO does not return one single
# result when copying a directory. Instead, it returns the results
# of copying every file in the source directory.
self.assertEqual(len(result), len(sources) - 1)
for _, _, err in result[:n_real_files]:
self.assertTrue(err is None)
for _, _, err in result[n_real_files:]:
self.assertIsInstance(err, messages.S3ClientError)
# For the same reason of copy_paths function of S3IO above
# skip this assert.
#self.assertEqual(result[-3][2].code, 404)
self.assertEqual(result[-2][2].code, 404)
self.assertEqual(result[-1][2].code, 400)
# Clean up
self.aws.delete_files(files)
self.aws.delete_files(destinations)
def test_copy_tree(self):
src_dir_name = self.TEST_DATA_PATH + 'source/'
dest_dir_name = self.TEST_DATA_PATH + 'dest/'
file_size = 1024
paths = ['a', 'b/c', 'b/d']
for path in paths:
src_file_name = src_dir_name + path
dest_file_name = dest_dir_name + path
self._insert_random_file(self.client, src_file_name, file_size)
self.assertTrue(
src_file_name in self.aws.list_prefix(self.TEST_DATA_PATH))
self.assertFalse(
dest_file_name in self.aws.list_prefix(self.TEST_DATA_PATH))
results = self.aws.copy_tree(src_dir_name, dest_dir_name)
for src_file_name, dest_file_name, err in results:
self.assertTrue(src_dir_name in src_file_name)
self.assertTrue(dest_dir_name in dest_file_name)
self.assertIsNone(err)
self.assertTrue(
src_file_name in self.aws.list_prefix(self.TEST_DATA_PATH))
self.assertTrue(
dest_file_name in self.aws.list_prefix(self.TEST_DATA_PATH))
# Clean up
for path in paths:
src_file_name = src_dir_name + path
dest_file_name = dest_dir_name + path
self.aws.delete_files([src_file_name, dest_file_name])
def test_rename(self):
src_file_name = self.TEST_DATA_PATH + 'source'
dest_file_name = self.TEST_DATA_PATH + 'dest'
file_size = 1024
self._insert_random_file(self.client, src_file_name, file_size)
self.assertTrue(
src_file_name in self.aws.list_prefix(self.TEST_DATA_PATH))
self.assertFalse(
dest_file_name in self.aws.list_prefix(self.TEST_DATA_PATH))
self.aws.rename(src_file_name, dest_file_name)
self.assertFalse(
src_file_name in self.aws.list_prefix(self.TEST_DATA_PATH))
self.assertTrue(
dest_file_name in self.aws.list_prefix(self.TEST_DATA_PATH))
# Clean up
self.aws.delete_files([src_file_name, dest_file_name])
def test_rename_files(self):
from_name_pattern = self.TEST_DATA_PATH + 'to_rename_%d'
to_name_pattern = self.TEST_DATA_PATH + 'been_renamed_%d'
file_size = 1024
num_files = 10
src_dest_pairs = [(from_name_pattern % i, to_name_pattern % i)
for i in range(num_files)]
result = self.aws.rename_files(src_dest_pairs)
self.assertTrue(result)
for i, (src, dest, exception) in enumerate(result):
self.assertEqual(src, from_name_pattern % i)
self.assertEqual(dest, to_name_pattern % i)
self.assertTrue(isinstance(exception, messages.S3ClientError))
self.assertEqual(exception.code, 404)
self.assertFalse(self.aws.exists(from_name_pattern % i))
self.assertFalse(self.aws.exists(to_name_pattern % i))
# Insert some files.
for i in range(num_files):
self._insert_random_file(self.client, from_name_pattern % i, file_size)
# Check files inserted properly.
for i in range(num_files):
self.assertTrue(self.aws.exists(from_name_pattern % i))
self.assertFalse(self.aws.exists(to_name_pattern % i))
# Execute batch rename.
self.aws.rename_files(src_dest_pairs)
# Check files were renamed properly.
for i in range(num_files):
self.assertFalse(self.aws.exists(from_name_pattern % i))
self.assertTrue(self.aws.exists(to_name_pattern % i))
# Clean up
all_files = set().union(*[set(pair) for pair in src_dest_pairs])
self.aws.delete_files(all_files)
def test_rename_files_with_errors(self):
real_prefix = self.TEST_DATA_PATH + 'rename_batch_%s'
fake_prefix = 's3://fake-bucket-68ae4b0ef7b9/rename_batch_%s'
src_dest_pairs = [(prefix % 'src', prefix % 'dest')
for prefix in (real_prefix, fake_prefix)]
# Create the file in the real bucket
self._insert_random_file(self.client, real_prefix % 'src', 1024)
# Execute batch rename
result = self.aws.rename_files(src_dest_pairs)
# First is the file in the real bucket, which shouldn't throw an error
self.assertEqual(result[0][0], src_dest_pairs[0][0])
self.assertEqual(result[0][1], src_dest_pairs[0][1])
self.assertIsNone(result[0][2])
# Second is the file in the fake bucket, which should throw a 404
self.assertEqual(result[1][0], src_dest_pairs[1][0])
self.assertEqual(result[1][1], src_dest_pairs[1][1])
self.assertEqual(result[1][2].code, 404)
# Clean up
self.aws.delete(real_prefix % 'dest')
def test_rename_files_with_errors_directory(self):
# Make file
dir_name = self.TEST_DATA_PATH + 'rename_dir/'
file_name = dir_name + 'file'
self._insert_random_file(self.client, file_name, 1024)
self.assertTrue(self.aws.exists(file_name))
with self.assertRaises(ValueError):
self.aws.rename_files([(file_name, self.TEST_DATA_PATH + 'dir_dest/')])
# Clean up
self.aws.delete(file_name)
def test_delete_paths(self):
# Make files
prefix = self.TEST_DATA_PATH + 'delete_paths/'
file_names = [prefix + 'a', prefix + 'b/c']
for file_name in file_names:
self._insert_random_file(self.client, file_name, 1024)
self.assertTrue(self.aws.exists(file_names[0]))
self.assertTrue(self.aws.exists(file_names[1]))
# Delete paths
paths = [prefix + 'a', prefix + 'b/']
self.aws.delete_paths(paths)
self.assertFalse(self.aws.exists(file_names[0]))
self.assertFalse(self.aws.exists(file_names[1]))
def test_delete(self):
file_name = self.TEST_DATA_PATH + 'delete_file'
file_size = 1024
# Test deletion of non-existent file (shouldn't raise any error)
self.aws.delete(file_name)
# Create the file and check that it was created
self._insert_random_file(self.aws.client, file_name, file_size)
files = self.aws.list_prefix(self.TEST_DATA_PATH)
self.assertTrue(file_name in files)
# Delete the file and check that it was deleted
self.aws.delete(file_name)
self.assertFalse(self.aws.exists(file_name))
def test_delete_files(self, *unused_args):
file_name_pattern = self.TEST_DATA_PATH + 'delete_batch/%d'
file_size = 1024
num_files = 5
# Test deletion of non-existent files.
result = self.aws.delete_files(
[file_name_pattern % i for i in range(num_files)])
self.assertTrue(result)
for i, (file_name, exception) in enumerate(result):
self.assertEqual(file_name, file_name_pattern % i)
self.assertEqual(exception, None)
self.assertFalse(self.aws.exists(file_name_pattern % i))
# Insert some files.
for i in range(num_files):
self._insert_random_file(self.client, file_name_pattern % i, file_size)
# Check files inserted properly.
for i in range(num_files):
self.assertTrue(self.aws.exists(file_name_pattern % i))
# Execute batch delete.
self.aws.delete_files([file_name_pattern % i for i in range(num_files)])
# Check files deleted properly.
for i in range(num_files):
self.assertFalse(self.aws.exists(file_name_pattern % i))
def test_delete_files_with_errors(self, *unused_args):
real_file = self.TEST_DATA_PATH + 'delete_batch/file'
fake_file = 's3://fake-bucket-68ae4b0ef7b9/delete_batch/file'
filenames = [real_file, fake_file]
result = self.aws.delete_files(filenames)
# First is the file in the real bucket, which shouldn't throw an error
self.assertEqual(result[0][0], filenames[0])
self.assertIsNone(result[0][1])
# Second is the file in the fake bucket, which should throw a 404
self.assertEqual(result[1][0], filenames[1])
self.assertEqual(result[1][1].code, 404)
def test_delete_tree(self):
root_path = self.TEST_DATA_PATH + 'delete_tree/'
leaf_paths = ['a', 'b/c', 'b/d', 'b/d/e']
paths = [root_path + leaf for leaf in leaf_paths]
# Create file tree
file_size = 1024
for path in paths:
self._insert_random_file(self.client, path, file_size)
# Check that the files exist
for path in paths:
self.assertTrue(self.aws.exists(path))
# Delete the tree
self.aws.delete_tree(root_path)
# Check that the files have been deleted
for path in paths:
self.assertFalse(self.aws.exists(path))
def test_exists(self):
file_name = self.TEST_DATA_PATH + 'exists'
file_size = 1024
self.assertFalse(self.aws.exists(file_name))
self._insert_random_file(self.aws.client, file_name, file_size)
self.assertTrue(self.aws.exists(file_name))
# Clean up
self.aws.delete(file_name)
self.assertFalse(self.aws.exists(file_name))
def test_file_mode(self):
file_name = self.TEST_DATA_PATH + 'jerry/pigpen/bobby'
with self.aws.open(file_name, 'w') as f:
assert f.mode == 'w'
with self.aws.open(file_name, 'r') as f:
assert f.mode == 'r'
# Clean up
self.aws.delete(file_name)
def test_full_file_read(self):
file_name = self.TEST_DATA_PATH + 'jerry/pigpen/phil'
file_size = 1024
f = self._insert_random_file(self.aws.client, file_name, file_size)
contents = f.contents
f = self.aws.open(file_name)
self.assertEqual(f.mode, 'r')
f.seek(0, os.SEEK_END)
self.assertEqual(f.tell(), file_size)
self.assertEqual(f.read(), b'')
f.seek(0)
self.assertEqual(f.read(), contents)
# Clean up
self.aws.delete(file_name)
def test_file_write(self):
file_name = self.TEST_DATA_PATH + 'write_file'
file_size = 8 * 1024 * 1024 + 2000
contents = os.urandom(file_size)
f = self.aws.open(file_name, 'w')
self.assertEqual(f.mode, 'w')
f.write(contents[0:1000])
f.write(contents[1000:1024 * 1024])
f.write(contents[1024 * 1024:])
f.close()
new_f = self.aws.open(file_name, 'r')
new_f_contents = new_f.read()
self.assertEqual(
new_f_contents, contents)
# Clean up
self.aws.delete(file_name)
def test_file_mime_type(self):
if self.USE_MOCK:
self.skipTest("The boto3_client mock doesn't support mime_types")
mime_type = 'example/example'
file_name = self.TEST_DATA_PATH + 'write_file'
f = self.aws.open(file_name, 'w', mime_type=mime_type)
f.write(b'a string of binary text')
f.close()
bucket, key = s3io.parse_s3_path(file_name)
metadata = self.client.get_object_metadata(messages.GetRequest(bucket, key))
self.assertEqual(mime_type, metadata.mime_type)
# Clean up
self.aws.delete(file_name)
def test_file_random_seek(self):
file_name = self.TEST_DATA_PATH + 'write_seek_file'
file_size = 5 * 1024 * 1024 - 100
contents = os.urandom(file_size)
with self.aws.open(file_name, 'w') as wf:
wf.write(contents)
f = self.aws.open(file_name)
random.seed(0)
for _ in range(0, 10):
a = random.randint(0, file_size - 1)
b = random.randint(0, file_size - 1)
start, end = min(a, b), max(a, b)
f.seek(start)
self.assertEqual(f.tell(), start)
self.assertEqual(
f.read(end - start + 1), contents[start:end + 1]
)
self.assertEqual(f.tell(), end + 1)
# Clean up
self.aws.delete(file_name)
def test_file_flush(self):
file_name = self.TEST_DATA_PATH + 'flush_file'
file_size = 5 * 1024 * 1024 + 2000
contents = os.urandom(file_size)
f = self.aws.open(file_name, 'w')
self.assertEqual(f.mode, 'w')
f.write(contents[0:1000])
f.flush()
f.write(contents[1000:1024 * 1024])
f.flush()
f.flush() # Should be a NOOP.
f.write(contents[1024 * 1024:])
f.close() # This should al`read`y call the equivalent of flush() in its body
new_f = self.aws.open(file_name, 'r')
new_f_contents = new_f.read()
self.assertEqual(
new_f_contents, contents)
# Clean up
self.aws.delete(file_name)
def test_file_iterator(self):
file_name = self.TEST_DATA_PATH + 'iterate_file'
lines = []
line_count = 10
for _ in range(line_count):
line_length = random.randint(100, 500)
line = os.urandom(line_length).replace(b'\n', b' ') + b'\n'
lines.append(line)
contents = b''.join(lines)
with self.aws.open(file_name, 'w') as wf:
wf.write(contents)
f = self.aws.open(file_name)
read_lines = 0
for line in f:
read_lines += 1
self.assertEqual(read_lines, line_count)
# Clean up
self.aws.delete(file_name)
def test_file_read_line(self):
file_name = self.TEST_DATA_PATH + 'read_line_file'
lines = []
# Set a small buffer size to exercise refilling the buffer.
# First line is carefully crafted so the newline falls as the last character
# of the buffer to exercise this code path.
read_buffer_size = 1099
lines.append(b'x' * 1023 + b'\n')
for _ in range(1, 1000):
line_length = random.randint(100, 500)
line = os.urandom(line_length).replace(b'\n', b' ') + b'\n'
lines.append(line)
contents = b''.join(lines)
file_size = len(contents)
with self.aws.open(file_name, 'wb') as wf:
wf.write(contents)
f = self.aws.open(file_name, 'rb', read_buffer_size=read_buffer_size)
# Test read of first two lines.
f.seek(0)
self.assertEqual(f.readline(), lines[0])
self.assertEqual(f.tell(), len(lines[0]))
self.assertEqual(f.readline(), lines[1])
# Test read at line boundary.
f.seek(file_size - len(lines[-1]) - 1)
self.assertEqual(f.readline(), b'\n')
# Test read at end of file.
f.seek(file_size)
self.assertEqual(f.readline(), b'')
# Test reads at random positions.
random.seed(0)
for _ in range(0, 10):
start = random.randint(0, file_size - 1)
line_index = 0
# Find line corresponding to start index.
chars_left = start
while True:
next_line_length = len(lines[line_index])
if chars_left - next_line_length < 0:
break
chars_left -= next_line_length
line_index += 1
f.seek(start)
self.assertEqual(f.readline(), lines[line_index][chars_left:])
# Clean up
self.aws.delete(file_name)
def test_file_close(self):
file_name = self.TEST_DATA_PATH + 'close_file'
file_size = 5 * 1024 * 1024 + 2000
contents = os.urandom(file_size)
f = self.aws.open(file_name, 'w')
self.assertEqual(f.mode, 'w')
f.write(contents)
f.close()
f.close() # This should not crash.
with self.aws.open(file_name, 'r') as f:
read_contents = f.read()
self.assertEqual(
read_contents, contents)
# Clean up
self.aws.delete(file_name)
def test_context_manager(self):
# Test writing with a context manager.
file_name = self.TEST_DATA_PATH + 'context_manager_file'
file_size = 1024
contents = os.urandom(file_size)
with self.aws.open(file_name, 'w') as f:
f.write(contents)
with self.aws.open(file_name, 'r') as f:
self.assertEqual(f.read(), contents)
# Clean up
self.aws.delete(file_name)
def test_list_prefix(self):
objects = [
('jerry/pigpen/phil', 5),
('jerry/pigpen/bobby', 3),
('jerry/billy/bobby', 4),
]
for (object_name, size) in objects:
file_name = self.TEST_DATA_PATH + object_name
self._insert_random_file(self.aws.client, file_name, size)
test_cases = [
(self.TEST_DATA_PATH + 'j', [
('jerry/pigpen/phil', 5),
('jerry/pigpen/bobby', 3),
('jerry/billy/bobby', 4),
]),
(self.TEST_DATA_PATH + 'jerry/', [
('jerry/pigpen/phil', 5),
('jerry/pigpen/bobby', 3),
('jerry/billy/bobby', 4),
]),
(self.TEST_DATA_PATH + 'jerry/pigpen/phil', [
('jerry/pigpen/phil', 5),
]),
]
for file_pattern, expected_object_names in test_cases:
expected_file_names = [(self.TEST_DATA_PATH + object_name, size)
for (object_name, size) in expected_object_names]
self.assertEqual(
set(self.aws.list_prefix(file_pattern).items()),
set(expected_file_names))
# Clean up
for (object_name, size) in objects:
self.aws.delete(self.TEST_DATA_PATH + object_name)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()