blob: f1d5d593769bfada8924d7ab26053cba395de190 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import with_statement
import os
import sys
import platform
import shutil
import unittest
import time
import tempfile
import multiprocessing
from libcloud.common.types import LibcloudError
from libcloud.storage.base import Container
from libcloud.storage.base import Object
from libcloud.storage.types import ContainerDoesNotExistError
from libcloud.storage.types import ContainerAlreadyExistsError
from libcloud.storage.types import ContainerIsNotEmptyError
from libcloud.storage.types import InvalidContainerNameError
from libcloud.utils.files import exhaust_iterator
try:
from libcloud.storage.drivers.local import LocalStorageDriver
from libcloud.storage.drivers.local import LockLocalStorage
import fasteners
except ImportError:
print('fasteners library is not available, skipping local_storage tests...')
LocalStorageDriver = None
class LocalTests(unittest.TestCase):
driver_type = LocalStorageDriver
@classmethod
def create_driver(self):
self.key = tempfile.mkdtemp()
return self.driver_type(self.key, None)
def setUp(self):
self.driver = self.create_driver()
def tearDown(self):
shutil.rmtree(self.key)
self.key = None
def make_tmp_file(self, content=None):
if not content:
content = b'blah' * 1024
_, tmppath = tempfile.mkstemp()
with open(tmppath, 'wb') as fp:
fp.write(content)
return tmppath
def remove_tmp_file(self, tmppath):
try:
os.unlink(tmppath)
except Exception as e:
msg = str(e)
if 'being used by another process' in msg and platform.system().lower() == 'windows':
return
raise e
@unittest.skipIf(platform.system().lower() == 'windows', 'Unsupported on Windows')
def test_lock_local_storage(self):
# 1. Acquire succeeds
lock = LockLocalStorage("/tmp/a")
with lock:
self.assertTrue(True)
# 2. Acquire fails because lock is already acquired
lock = LockLocalStorage("/tmp/b", timeout=0.5)
with lock:
expected_msg = "Failed to acquire thread lock"
self.assertRaisesRegex(LibcloudError, expected_msg, lock.__enter__)
# 3. Multiprocessing scenario where IPC lock is involved
def acquire_lock_in_subprocess(pid, success):
# For first process acquire should succeed and for the second it should fail
lock = LockLocalStorage("/tmp/c", timeout=0.5)
if pid == 1:
with lock:
time.sleep(1)
success.value = 1
elif pid == 2:
expected_msg = "Failed to acquire IPC lock"
self.assertRaisesRegex(LibcloudError, expected_msg, lock.__enter__)
success.value = 1
else:
raise ValueError("Invalid pid")
success_1 = multiprocessing.Value('i', 0)
success_2 = multiprocessing.Value('i', 0)
p1 = multiprocessing.Process(target=acquire_lock_in_subprocess, args=(1, success_1,))
p1.start()
p2 = multiprocessing.Process(target=acquire_lock_in_subprocess, args=(2, success_2,))
p2.start()
p1.join()
p2.join()
self.assertEqual(bool(success_1.value), True, "Check didn't pass")
self.assertEqual(bool(success_2.value), True, "Second check didn't pass")
def test_list_containers_empty(self):
containers = self.driver.list_containers()
self.assertEqual(len(containers), 0)
def test_containers_success(self):
self.driver.create_container('test1')
self.driver.create_container('test2')
containers = self.driver.list_containers()
self.assertEqual(len(containers), 2)
container = containers[1]
self.assertTrue('creation_time' in container.extra)
self.assertTrue('modify_time' in container.extra)
self.assertTrue('access_time' in container.extra)
objects = self.driver.list_container_objects(container=container)
self.assertEqual(len(objects), 0)
objects = container.list_objects()
self.assertEqual(len(objects), 0)
for container in containers:
self.driver.delete_container(container)
def test_objects_success(self):
tmppath = self.make_tmp_file()
container = self.driver.create_container('test3')
obj1 = container.upload_object(tmppath, 'object1')
obj2 = container.upload_object(tmppath, 'path/object2')
obj3 = container.upload_object(tmppath, 'path/to/object3')
obj4 = container.upload_object(tmppath, 'path/to/object4.ext')
with open(tmppath, 'rb') as tmpfile:
obj5 = container.upload_object_via_stream(tmpfile, 'object5')
objects = self.driver.list_container_objects(container=container)
self.assertEqual(len(objects), 5)
prefix = os.path.join('path', 'to')
objects = self.driver.list_container_objects(container=container,
prefix=prefix)
self.assertEqual(len(objects), 2)
for obj in objects:
self.assertNotEqual(obj.hash, None)
self.assertEqual(obj.size, 4096)
self.assertEqual(obj.container.name, 'test3')
self.assertTrue('creation_time' in obj.extra)
self.assertTrue('modify_time' in obj.extra)
self.assertTrue('access_time' in obj.extra)
obj1.delete()
obj2.delete()
objects = container.list_objects()
self.assertEqual(len(objects), 3)
container.delete_object(obj3)
container.delete_object(obj4)
container.delete_object(obj5)
objects = container.list_objects()
self.assertEqual(len(objects), 0)
container.delete()
self.remove_tmp_file(tmppath)
def test_get_container_doesnt_exist(self):
try:
self.driver.get_container(container_name='container1')
except ContainerDoesNotExistError:
pass
else:
self.fail('Exception was not thrown')
def test_get_container_success(self):
self.driver.create_container('test4')
container = self.driver.get_container(container_name='test4')
self.assertTrue(container.name, 'test4')
container.delete()
def test_get_object_container_doesnt_exist(self):
try:
self.driver.get_object(container_name='test-inexistent',
object_name='test')
except ContainerDoesNotExistError:
pass
else:
self.fail('Exception was not thrown')
def test_get_object_success(self):
tmppath = self.make_tmp_file()
container = self.driver.create_container('test5')
container.upload_object(tmppath, 'test')
obj = self.driver.get_object(container_name='test5',
object_name='test')
self.assertEqual(obj.name, 'test')
self.assertEqual(obj.container.name, 'test5')
self.assertEqual(obj.size, 4096)
self.assertNotEqual(obj.hash, None)
self.assertTrue('creation_time' in obj.extra)
self.assertTrue('modify_time' in obj.extra)
self.assertTrue('access_time' in obj.extra)
obj.delete()
container.delete()
self.remove_tmp_file(tmppath)
def test_create_container_invalid_name(self):
try:
self.driver.create_container(container_name='new/container')
except InvalidContainerNameError:
pass
else:
self.fail('Exception was not thrown')
def test_create_container_already_exists(self):
container = self.driver.create_container(
container_name='new-container')
try:
self.driver.create_container(container_name='new-container')
except ContainerAlreadyExistsError:
pass
else:
self.fail('Exception was not thrown')
# success
self.driver.delete_container(container)
def test_create_container_success(self):
name = 'new_container'
container = self.driver.create_container(container_name=name)
self.assertEqual(container.name, name)
self.driver.delete_container(container)
def test_delete_container_doesnt_exist(self):
container = Container(name='new_container', extra=None,
driver=self.driver)
try:
self.driver.delete_container(container=container)
except ContainerDoesNotExistError:
pass
else:
self.fail('Exception was not thrown')
def test_delete_container_not_empty(self):
tmppath = self.make_tmp_file()
container = self.driver.create_container('test6')
obj = container.upload_object(tmppath, 'test')
try:
self.driver.delete_container(container=container)
except ContainerIsNotEmptyError:
pass
else:
self.fail('Exception was not thrown')
# success
obj.delete()
self.remove_tmp_file(tmppath)
self.assertTrue(self.driver.delete_container(container=container))
def test_delete_container_not_found(self):
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
try:
self.driver.delete_container(container=container)
except ContainerDoesNotExistError:
pass
else:
self.fail('Container does not exist but an exception was not' +
'thrown')
def test_delete_container_success(self):
container = self.driver.create_container('test7')
self.assertTrue(self.driver.delete_container(container=container))
def test_download_object_success(self):
tmppath = self.make_tmp_file()
container = self.driver.create_container('test6')
obj = container.upload_object(tmppath, 'test')
destination_path = tmppath + '.temp'
result = self.driver.download_object(obj=obj,
destination_path=destination_path,
overwrite_existing=False,
delete_on_failure=True)
self.assertTrue(result)
obj.delete()
container.delete()
self.remove_tmp_file(tmppath)
os.unlink(destination_path)
def test_download_object_and_overwrite(self):
tmppath = self.make_tmp_file()
container = self.driver.create_container('test6')
obj = container.upload_object(tmppath, 'test')
destination_path = tmppath + '.temp'
result = self.driver.download_object(obj=obj,
destination_path=destination_path,
overwrite_existing=False,
delete_on_failure=True)
self.assertTrue(result)
try:
self.driver.download_object(obj=obj,
destination_path=destination_path,
overwrite_existing=False,
delete_on_failure=True)
except LibcloudError:
pass
else:
self.fail('Exception was not thrown')
result = self.driver.download_object(obj=obj,
destination_path=destination_path,
overwrite_existing=True,
delete_on_failure=True)
self.assertTrue(result)
# success
obj.delete()
container.delete()
self.remove_tmp_file(tmppath)
os.unlink(destination_path)
def test_download_object_as_stream_success(self):
tmppath = self.make_tmp_file()
container = self.driver.create_container('test6')
obj = container.upload_object(tmppath, 'test')
stream = self.driver.download_object_as_stream(obj=obj,
chunk_size=1024)
self.assertTrue(hasattr(stream, '__iter__'))
data = b''.join(stream)
self.assertTrue(len(data), 4096)
obj.delete()
container.delete()
self.remove_tmp_file(tmppath)
def test_download_object_range_success(self):
content = b'0123456789123456789'
tmppath = self.make_tmp_file(content=content)
container = self.driver.create_container('test6')
obj = container.upload_object(tmppath, 'test')
destination_path = tmppath + '.temp'
# 1. Only start_bytes provided
result = self.driver.download_object_range(obj=obj,
destination_path=destination_path,
start_bytes=4,
overwrite_existing=True,
delete_on_failure=True)
self.assertTrue(result)
with open(destination_path, 'rb') as fp:
written_content = fp.read()
self.assertEqual(written_content, b'456789123456789')
self.assertEqual(written_content, content[4:])
# 2. start_bytes and end_bytes is provided
result = self.driver.download_object_range(obj=obj,
destination_path=destination_path,
start_bytes=4,
end_bytes=6,
overwrite_existing=True,
delete_on_failure=True)
self.assertTrue(result)
with open(destination_path, 'rb') as fp:
written_content = fp.read()
self.assertEqual(written_content, b'45')
self.assertEqual(written_content, content[4:6])
result = self.driver.download_object_range(obj=obj,
destination_path=destination_path,
start_bytes=0,
end_bytes=1,
overwrite_existing=True,
delete_on_failure=True)
self.assertTrue(result)
with open(destination_path, 'rb') as fp:
written_content = fp.read()
self.assertEqual(written_content, b'0')
self.assertEqual(written_content, content[0:1])
result = self.driver.download_object_range(obj=obj,
destination_path=destination_path,
start_bytes=0,
end_bytes=2,
overwrite_existing=True,
delete_on_failure=True)
self.assertTrue(result)
with open(destination_path, 'rb') as fp:
written_content = fp.read()
self.assertEqual(written_content, b'01')
self.assertEqual(written_content, content[0:2])
result = self.driver.download_object_range(obj=obj,
destination_path=destination_path,
start_bytes=0,
end_bytes=len(content),
overwrite_existing=True,
delete_on_failure=True)
self.assertTrue(result)
with open(destination_path, 'rb') as fp:
written_content = fp.read()
self.assertEqual(written_content, b'0123456789123456789')
self.assertEqual(written_content, content[0:len(content)])
obj.delete()
container.delete()
self.remove_tmp_file(tmppath)
os.unlink(destination_path)
def test_download_object_range_as_stream_success(self):
content = b'0123456789123456789'
tmppath = self.make_tmp_file(content=content)
container = self.driver.create_container('test6')
obj = container.upload_object(tmppath, 'test')
# 1. Only start_bytes provided
stream = self.driver.download_object_range_as_stream(obj=obj,
start_bytes=4,
chunk_size=1024)
written_content = b''.join(stream)
self.assertEqual(written_content, b'456789123456789')
self.assertEqual(written_content, content[4:])
# 2. start_bytes and end_bytes is provided
stream = self.driver.download_object_range_as_stream(obj=obj,
start_bytes=4,
end_bytes=7,
chunk_size=1024)
written_content = b''.join(stream)
self.assertEqual(written_content, b'456')
self.assertEqual(written_content, content[4:7])
stream = self.driver.download_object_range_as_stream(obj=obj,
start_bytes=0,
end_bytes=1,
chunk_size=1024)
written_content = b''.join(stream)
self.assertEqual(written_content, b'0')
self.assertEqual(written_content, content[0:1])
stream = self.driver.download_object_range_as_stream(obj=obj,
start_bytes=1,
end_bytes=3,
chunk_size=1024)
written_content = b''.join(stream)
self.assertEqual(written_content, b'12')
self.assertEqual(written_content, content[1:3])
stream = self.driver.download_object_range_as_stream(obj=obj,
start_bytes=0,
end_bytes=len(content),
chunk_size=1024)
written_content = b''.join(stream)
self.assertEqual(written_content, b'0123456789123456789')
self.assertEqual(written_content, content[0:len(content)])
obj.delete()
container.delete()
self.remove_tmp_file(tmppath)
def test_download_object_range_invalid_values(self):
obj = Object('a', 500, '', {}, {}, None, None)
tmppath = self.make_tmp_file(content='')
expected_msg = 'start_bytes must be greater than 0'
self.assertRaisesRegex(ValueError, expected_msg,
self.driver.download_object_range, obj=obj,
destination_path=tmppath,
start_bytes=-1)
expected_msg = 'start_bytes must be smaller than end_bytes'
self.assertRaisesRegex(ValueError, expected_msg,
self.driver.download_object_range, obj=obj,
destination_path=tmppath,
start_bytes=5,
end_bytes=4)
expected_msg = 'start_bytes and end_bytes can\'t be the same'
self.assertRaisesRegex(ValueError, expected_msg,
self.driver.download_object_range, obj=obj,
destination_path=tmppath,
start_bytes=5,
end_bytes=5)
def test_download_object_range_as_stream_invalid_values(self):
content = b'0123456789123456789'
tmppath = self.make_tmp_file(content=content)
container = self.driver.create_container('test6')
obj = container.upload_object(tmppath, 'test')
expected_msg = 'start_bytes must be greater than 0'
stream = self.driver.download_object_range_as_stream(
obj=obj,
start_bytes=-1,
end_bytes=None,
chunk_size=1024)
self.assertRaisesRegex(ValueError, expected_msg, exhaust_iterator,
stream)
expected_msg = 'start_bytes must be smaller than end_bytes'
stream = self.driver.download_object_range_as_stream(
obj=obj,
start_bytes=5,
end_bytes=4,
chunk_size=1024)
self.assertRaisesRegex(ValueError, expected_msg, exhaust_iterator,
stream)
expected_msg = 'end_bytes is larger than file size'
stream = self.driver.download_object_range_as_stream(
obj=obj,
start_bytes=5,
end_bytes=len(content) + 1,
chunk_size=1024)
expected_msg = 'start_bytes and end_bytes can\'t be the same'
stream = self.driver.download_object_range_as_stream(
obj=obj,
start_bytes=5,
end_bytes=5,
chunk_size=1024)
obj.delete()
container.delete()
self.remove_tmp_file(tmppath)
if not LocalStorageDriver:
class LocalTests(unittest.TestCase): # NOQA
pass
if __name__ == '__main__':
sys.exit(unittest.main())