blob: 67797b1736b91ae0aef4c0310f256291d9bf9a05 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pytype: skip-file
import logging
import os
import unittest
from apache_beam.io.aws import s3io
from apache_beam.io.aws.clients.s3 import fake_client
from apache_beam.io.aws.clients.s3 import messages
from apache_beam.options import pipeline_options
class ClientErrorTest(unittest.TestCase):
def setUp(self):
# These tests can be run locally against a mock S3 client, or as integration
# tests against the real S3 client.
self.USE_MOCK = True
# If you're running integration tests with S3, set this variable to be an
# s3 path that you have access to where test data can be written. If you're
# just running tests against the mock, this can be any s3 path. It should
# end with a '/'.
self.TEST_DATA_PATH = 's3://random-data-sets/beam_tests/'
self.test_bucket, self.test_path = s3io.parse_s3_path(self.TEST_DATA_PATH)
if self.USE_MOCK:
self.client = fake_client.FakeS3Client()
test_data_bucket, _ = s3io.parse_s3_path(self.TEST_DATA_PATH)
self.client.known_buckets.add(test_data_bucket)
self.aws = s3io.S3IO(self.client)
else:
self.aws = s3io.S3IO(options=pipeline_options.S3Options())
def test_get_object_metadata(self):
# Test nonexistent object
object = self.test_path + 'nonexistent_file_doesnt_exist'
request = messages.GetRequest(self.test_bucket, object)
self.assertRaises(
messages.S3ClientError, self.client.get_object_metadata, request)
try:
self.client.get_object_metadata(request)
except Exception as e:
self.assertIsInstance(e, messages.S3ClientError)
self.assertEqual(e.code, 404)
def test_get_range_nonexistent(self):
# Test nonexistent object
object = self.test_path + 'nonexistent_file_doesnt_exist'
request = messages.GetRequest(self.test_bucket, object)
self.assertRaises(
messages.S3ClientError, self.client.get_range, request, 0, 10)
try:
self.client.get_range(request, 0, 10)
except Exception as e:
self.assertIsInstance(e, messages.S3ClientError)
self.assertEqual(e.code, 404)
def test_get_range_bad_start_end(self):
file_name = self.TEST_DATA_PATH + 'get_range'
contents = os.urandom(1024)
with self.aws.open(file_name, 'w') as f:
f.write(contents)
bucket, object = s3io.parse_s3_path(file_name)
response = self.client.get_range(
messages.GetRequest(bucket, object), -10, 20)
self.assertEqual(response, contents)
response = self.client.get_range(
messages.GetRequest(bucket, object), 20, 10)
self.assertEqual(response, contents)
# Clean up
self.aws.delete(file_name)
def test_upload_part_nonexistent_upload_id(self):
object = self.test_path + 'upload_part'
upload_id = 'not-an-id-12345'
part_number = 1
contents = os.urandom(1024)
request = messages.UploadPartRequest(
self.test_bucket, object, upload_id, part_number, contents)
self.assertRaises(messages.S3ClientError, self.client.upload_part, request)
try:
self.client.upload_part(request)
except Exception as e:
self.assertIsInstance(e, messages.S3ClientError)
self.assertEqual(e.code, 404)
def test_copy_nonexistent(self):
src_key = self.test_path + 'not_a_real_file_does_not_exist'
dest_key = self.test_path + 'destination_file_location'
request = messages.CopyRequest(
self.test_bucket, src_key, self.test_bucket, dest_key)
with self.assertRaises(messages.S3ClientError) as e:
self.client.copy(request)
self.assertEqual(e.exception.code, 404)
def test_upload_part_bad_number(self):
object = self.test_path + 'upload_part'
contents = os.urandom(1024)
request = messages.UploadRequest(self.test_bucket, object, None)
response = self.client.create_multipart_upload(request)
upload_id = response.upload_id
part_number = 0.5
request = messages.UploadPartRequest(
self.test_bucket, object, upload_id, part_number, contents)
self.assertRaises(messages.S3ClientError, self.client.upload_part, request)
try:
response = self.client.upload_part(request)
except Exception as e:
self.assertIsInstance(e, messages.S3ClientError)
self.assertEqual(e.code, 400)
def test_complete_multipart_upload_too_small(self):
object = self.test_path + 'upload_part'
request = messages.UploadRequest(self.test_bucket, object, None)
response = self.client.create_multipart_upload(request)
upload_id = response.upload_id
part_number = 1
contents_1 = os.urandom(1024)
request_1 = messages.UploadPartRequest(
self.test_bucket, object, upload_id, part_number, contents_1)
response_1 = self.client.upload_part(request_1)
part_number = 2
contents_2 = os.urandom(1024)
request_2 = messages.UploadPartRequest(
self.test_bucket, object, upload_id, part_number, contents_2)
response_2 = self.client.upload_part(request_2)
parts = [{
'PartNumber': 1, 'ETag': response_1.etag
}, {
'PartNumber': 2, 'ETag': response_2.etag
}]
complete_request = messages.CompleteMultipartUploadRequest(
self.test_bucket, object, upload_id, parts)
try:
self.client.complete_multipart_upload(complete_request)
except Exception as e:
self.assertIsInstance(e, messages.S3ClientError)
self.assertEqual(e.code, 400)
def test_complete_multipart_upload_too_many(self):
object = self.test_path + 'upload_part'
request = messages.UploadRequest(self.test_bucket, object, None)
response = self.client.create_multipart_upload(request)
upload_id = response.upload_id
part_number = 1
contents_1 = os.urandom(5 * 1024)
request_1 = messages.UploadPartRequest(
self.test_bucket, object, upload_id, part_number, contents_1)
response_1 = self.client.upload_part(request_1)
part_number = 2
contents_2 = os.urandom(1024)
request_2 = messages.UploadPartRequest(
self.test_bucket, object, upload_id, part_number, contents_2)
response_2 = self.client.upload_part(request_2)
parts = [
{
'PartNumber': 1, 'ETag': response_1.etag
},
{
'PartNumber': 2, 'ETag': response_2.etag
},
{
'PartNumber': 3, 'ETag': 'fake-etag'
},
]
complete_request = messages.CompleteMultipartUploadRequest(
self.test_bucket, object, upload_id, parts)
try:
self.client.complete_multipart_upload(complete_request)
except Exception as e:
self.assertIsInstance(e, messages.S3ClientError)
self.assertEqual(e.code, 400)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()