blob: 69d37059b8285b2808ec77c6e00ec3044b4e5831 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or moreĀ§
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import random
import socket
import string
import os.path
import platform
import unittest
import warnings
from itertools import chain
import pytest
import libcloud.utils.files
from libcloud.utils.py3 import StringIO, b, bchr, urlquote, hexadigits
from libcloud.utils.misc import get_driver, set_driver, get_secure_random_string
from libcloud.common.types import LibcloudError
from libcloud.compute.types import Provider
from libcloud.utils.publickey import get_pubkey_ssh2_fingerprint, get_pubkey_openssh_fingerprint
from libcloud.utils.decorators import wrap_non_libcloud_exceptions
from libcloud.utils.networking import (
is_public_subnet,
is_private_subnet,
join_ipv4_segments,
is_valid_ip_address,
increment_ipv4_segments,
)
from libcloud.compute.providers import DRIVERS
from libcloud.compute.drivers.dummy import DummyNodeDriver
from libcloud.storage.drivers.dummy import DummyIterator
# In Python > 2.7 DeprecationWarnings are disabled by default
warnings.simplefilter("default")
WARNINGS_BUFFER = []
from io import FileIO as file
def show_warning(msg, cat, fname, lno, file=None, line=None):
WARNINGS_BUFFER.append((msg, cat, fname, lno))
original_func = warnings.showwarning
class TestUtils(unittest.TestCase):
def setUp(self):
global WARNINGS_BUFFER
WARNINGS_BUFFER = []
def tearDown(self):
global WARNINGS_BUFFER
WARNINGS_BUFFER = []
warnings.showwarning = original_func
@unittest.skipIf(platform.system().lower() == "windows", "Unsupported on Windows")
def test_guess_file_mime_type(self):
file_path = os.path.abspath(__file__)
mimetype, encoding = libcloud.utils.files.guess_file_mime_type(file_path=file_path)
self.assertTrue(mimetype.find("python") != -1)
def test_get_driver(self):
driver = get_driver(drivers=DRIVERS, provider=Provider.DUMMY)
self.assertTrue(driver is not None)
try:
driver = get_driver(drivers=DRIVERS, provider="fooba")
except AttributeError:
pass
else:
self.fail("Invalid provider, but an exception was not thrown")
def test_get_driver_string_and_enum_notation(self):
driver = get_driver(drivers=DRIVERS, provider=Provider.DUMMY)
self.assertEqual(driver, DummyNodeDriver)
driver = get_driver(drivers=DRIVERS, provider="dummy")
self.assertEqual(driver, DummyNodeDriver)
driver = get_driver(drivers=DRIVERS, provider="DUMMY")
self.assertEqual(driver, DummyNodeDriver)
def test_set_driver(self):
# Set an existing driver
try:
driver = set_driver(
DRIVERS,
Provider.DUMMY,
"libcloud.storage.drivers.dummy",
"DummyStorageDriver",
)
except AttributeError:
pass
# Register a new driver
driver = set_driver(
DRIVERS,
"testingset",
"libcloud.storage.drivers.dummy",
"DummyStorageDriver",
)
self.assertTrue(driver is not None)
# Register it again
try:
set_driver(
DRIVERS,
"testingset",
"libcloud.storage.drivers.dummy",
"DummyStorageDriver",
)
except AttributeError:
pass
# Register an invalid module
try:
set_driver(
DRIVERS,
"testingnew",
"libcloud.storage.drivers.dummy1",
"DummyStorageDriver",
)
except ImportError:
pass
# Register an invalid class
try:
set_driver(
DRIVERS,
"testingnew",
"libcloud.storage.drivers.dummy",
"DummyStorageDriver1",
)
except AttributeError:
pass
def test_deprecated_warning(self):
warnings.showwarning = show_warning
libcloud.utils.SHOW_DEPRECATION_WARNING = False
self.assertEqual(len(WARNINGS_BUFFER), 0)
libcloud.utils.deprecated_warning("test_module")
self.assertEqual(len(WARNINGS_BUFFER), 0)
libcloud.utils.SHOW_DEPRECATION_WARNING = True
self.assertEqual(len(WARNINGS_BUFFER), 0)
libcloud.utils.deprecated_warning("test_module")
self.assertEqual(len(WARNINGS_BUFFER), 1)
def test_in_development_warning(self):
warnings.showwarning = show_warning
libcloud.utils.SHOW_IN_DEVELOPMENT_WARNING = False
self.assertEqual(len(WARNINGS_BUFFER), 0)
libcloud.utils.in_development_warning("test_module")
self.assertEqual(len(WARNINGS_BUFFER), 0)
libcloud.utils.SHOW_IN_DEVELOPMENT_WARNING = True
self.assertEqual(len(WARNINGS_BUFFER), 0)
libcloud.utils.in_development_warning("test_module")
self.assertEqual(len(WARNINGS_BUFFER), 1)
def test_read_in_chunks_iterator_no_data(self):
iterator = DummyIterator()
generator1 = libcloud.utils.files.read_in_chunks(iterator=iterator, yield_empty=False)
generator2 = libcloud.utils.files.read_in_chunks(iterator=iterator, yield_empty=True)
# yield_empty=False
count = 0
for data in generator1:
count += 1
self.assertEqual(data, b(""))
self.assertEqual(count, 0)
# yield_empty=True
count = 0
for data in generator2:
count += 1
self.assertEqual(data, b(""))
self.assertEqual(count, 1)
def test_read_in_chunks_iterator(self):
def iterator():
for x in range(0, 1000):
yield "aa"
chunk_count = 0
for result in libcloud.utils.files.read_in_chunks(
iterator(), chunk_size=10, fill_size=False
):
chunk_count += 1
self.assertEqual(result, b("aa"))
self.assertEqual(chunk_count, 1000)
chunk_count = 0
for result in libcloud.utils.files.read_in_chunks(
iterator(), chunk_size=10, fill_size=True
):
chunk_count += 1
self.assertEqual(result, b("a") * 10)
self.assertEqual(chunk_count, 200)
def test_read_in_chunks_large_iterator_batches(self):
def iterator():
for x in range(0, 10):
yield "a" * 10_000
chunk_count = 0
for result in libcloud.utils.files.read_in_chunks(
iterator(), chunk_size=10, fill_size=False
):
chunk_count += 1
self.assertEqual(result, b("a") * 10_000)
self.assertEqual(chunk_count, 10)
chunk_count = 0
for result in libcloud.utils.files.read_in_chunks(
iterator(), chunk_size=10, fill_size=True
):
chunk_count += 1
self.assertEqual(result, b("a") * 10)
self.assertEqual(chunk_count, 10_000)
def test_read_in_chunks_filelike(self):
class FakeFile(file):
def __init__(self):
self.remaining = 500
def read(self, size):
self.remaining -= 1
if self.remaining == 0:
return ""
return "b" * (size + 1)
for index, result in enumerate(
libcloud.utils.files.read_in_chunks(FakeFile(), chunk_size=10, fill_size=False)
):
self.assertEqual(result, b("b" * 11))
self.assertEqual(index, 498)
for index, result in enumerate(
libcloud.utils.files.read_in_chunks(FakeFile(), chunk_size=10, fill_size=True)
):
if index != 548:
self.assertEqual(result, b("b" * 10))
else:
self.assertEqual(result, b("b" * 9))
self.assertEqual(index, 548)
def test_read_in_chunks_corectness(self):
data = "".join(
random.choice(string.ascii_lowercase) for i in range(5 * 1024 * 1024)
).encode("utf-8")
# fill_size = False
chunk_size = None
iterator = libcloud.utils.files.read_in_chunks(
iter([data]), chunk_size=chunk_size, fill_size=False
)
self.assertEqual(data, libcloud.utils.files.exhaust_iterator(iterator))
chunk_size = 1024 * 1024
iterator = libcloud.utils.files.read_in_chunks(
iter([data]), chunk_size=chunk_size, fill_size=False
)
self.assertEqual(data, libcloud.utils.files.exhaust_iterator(iterator))
chunk_size = 2 * 1024 * 1024
iterator = libcloud.utils.files.read_in_chunks(
iter([data]), chunk_size=chunk_size, fill_size=False
)
self.assertEqual(data, libcloud.utils.files.exhaust_iterator(iterator))
chunk_size = 5 * 1024 * 1024
iterator = libcloud.utils.files.read_in_chunks(
iter([data]), chunk_size=chunk_size, fill_size=False
)
self.assertEqual(data, libcloud.utils.files.exhaust_iterator(iterator))
chunk_size = 10 * 1024 * 1024
iterator = libcloud.utils.files.read_in_chunks(
iter([data]), chunk_size=chunk_size, fill_size=False
)
self.assertEqual(data, libcloud.utils.files.exhaust_iterator(iterator))
# fill_size = True
chunk_size = None
iterator = libcloud.utils.files.read_in_chunks(
iter([data]), chunk_size=chunk_size, fill_size=True
)
self.assertEqual(data, libcloud.utils.files.exhaust_iterator(iterator))
chunk_size = 1024 * 1024
iterator = libcloud.utils.files.read_in_chunks(
iter([data]), chunk_size=chunk_size, fill_size=True
)
self.assertEqual(data, libcloud.utils.files.exhaust_iterator(iterator))
chunk_size = 2 * 1024 * 1024
iterator = libcloud.utils.files.read_in_chunks(
iter([data]), chunk_size=chunk_size, fill_size=True
)
self.assertEqual(data, libcloud.utils.files.exhaust_iterator(iterator))
chunk_size = 5 * 1024 * 1024
iterator = libcloud.utils.files.read_in_chunks(
iter([data]), chunk_size=chunk_size, fill_size=True
)
self.assertEqual(data, libcloud.utils.files.exhaust_iterator(iterator))
chunk_size = 10 * 1024 * 1024
iterator = libcloud.utils.files.read_in_chunks(
iter([data]), chunk_size=chunk_size, fill_size=True
)
self.assertEqual(data, libcloud.utils.files.exhaust_iterator(iterator))
def test_exhaust_iterator(self):
def iterator_func():
for x in range(0, 1000):
yield "aa"
data = b("aa" * 1000)
iterator = libcloud.utils.files.read_in_chunks(iterator=iterator_func())
result = libcloud.utils.files.exhaust_iterator(iterator=iterator)
self.assertEqual(result, data)
result = libcloud.utils.files.exhaust_iterator(iterator=iterator_func())
self.assertEqual(result, data)
data = "12345678990"
iterator = StringIO(data)
result = libcloud.utils.files.exhaust_iterator(iterator=iterator)
self.assertEqual(result, b(data))
def test_exhaust_iterator_empty_iterator(self):
data = ""
iterator = StringIO(data)
result = libcloud.utils.files.exhaust_iterator(iterator=iterator)
self.assertEqual(result, b(data))
def test_unicode_urlquote(self):
# Regression tests for LIBCLOUD-429
# Note: this is a unicode literal
val = "\xe9"
uri = urlquote(val)
self.assertEqual(b(uri), b("%C3%A9"))
# Unicode without unicode characters
uri = urlquote("v=1")
self.assertEqual(b(uri), b("v%3D1"))
# Already-encoded bytestring without unicode characters
uri = urlquote(b("v=1"))
self.assertEqual(b(uri), b("v%3D1"))
def test_get_secure_random_string(self):
for i in range(1, 500):
value = get_secure_random_string(size=i)
self.assertEqual(len(value), i)
def test_hexadigits(self):
self.assertEqual(hexadigits(b("")), [])
self.assertEqual(hexadigits(b("a")), ["61"])
self.assertEqual(hexadigits(b("AZaz09-")), ["41", "5a", "61", "7a", "30", "39", "2d"])
def test_bchr(self):
self.assertEqual(bchr(0), b"\x00")
self.assertEqual(bchr(97), b"a")
class NetworkingUtilsTestCase(unittest.TestCase):
def test_is_public_and_is_private_subnet(self):
public_ips = ["213.151.0.8", "86.87.86.1", "8.8.8.8", "8.8.4.4"]
private_ips = ["192.168.1.100", "10.0.0.1", "172.16.0.0"]
for address in public_ips:
is_public = is_public_subnet(ip=address)
is_private = is_private_subnet(ip=address)
self.assertTrue(is_public)
self.assertFalse(is_private)
for address in private_ips:
is_public = is_public_subnet(ip=address)
is_private = is_private_subnet(ip=address)
self.assertFalse(is_public)
self.assertTrue(is_private)
def test_is_valid_ip_address(self):
valid_ipv4_addresses = [
"192.168.1.100",
"10.0.0.1",
"213.151.0.8",
"77.77.77.77",
]
invalid_ipv4_addresses = [
"10.1",
"256.256.256.256",
"0.567.567.567",
"192.168.0.257",
]
valid_ipv6_addresses = [
"fe80::200:5aee:feaa:20a2",
"2607:f0d0:1002:51::4",
"2607:f0d0:1002:0051:0000:0000:0000:0004",
"::1",
]
invalid_ipv6_addresses = [
"2607:f0d",
"2607:f0d0:0004",
]
for address in valid_ipv4_addresses:
status = is_valid_ip_address(address=address, family=socket.AF_INET)
self.assertTrue(status)
for address in valid_ipv6_addresses:
status = is_valid_ip_address(address=address, family=socket.AF_INET6)
self.assertTrue(status)
for address in chain(invalid_ipv4_addresses, invalid_ipv6_addresses):
status = is_valid_ip_address(address=address, family=socket.AF_INET)
self.assertFalse(status)
for address in chain(invalid_ipv4_addresses, invalid_ipv6_addresses):
status = is_valid_ip_address(address=address, family=socket.AF_INET6)
self.assertFalse(status)
def test_join_ipv4_segments(self):
values = [
(("127", "0", "0", "1"), "127.0.0.1"),
(("255", "255", "255", "0"), "255.255.255.0"),
]
for segments, joined_ip in values:
result = join_ipv4_segments(segments=segments)
self.assertEqual(result, joined_ip)
def test_increment_ipv4_segments(self):
values = [
(("127", "0", "0", "1"), "127.0.0.2"),
(("255", "255", "255", "0"), "255.255.255.1"),
(("254", "255", "255", "255"), "255.0.0.0"),
(("100", "1", "0", "255"), "100.1.1.0"),
]
for segments, incremented_ip in values:
result = increment_ipv4_segments(segments=segments)
result = join_ipv4_segments(segments=result)
self.assertEqual(result, incremented_ip)
class TestPublicKeyUtils(unittest.TestCase):
PUBKEY = (
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDOfbWSXOlqvYjZmRO84/lIoV4gvuX+"
"P1lLg50MMg6jZjLZIlYY081XPRmuom0xY0+BO++J2KgLl7gxJ6xMsKK2VQ+TakdfAH20"
"XfMcTohd/zVCeWsbqZQvEhVXBo4hPIktcfNz0u9Ez3EtInO+kb7raLcRhOVi9QmOkOrC"
"WtQU9mS71AWJuqI9H0YAnTiI8Hs5bn2tpMIqmTXT3g2bwywC25x1Nx9Hy0/FP+KUL6Ag"
"vDXv47l+TgSDfTBEkvq+IF1ITrnaOG+nRE02oZC6cwHYTifM/IOollkujxIQmi2Z+j66"
"OHSrjnEQugr0FqGJF2ygKfIh/i2u3fVLM60qE2NN user@example"
)
def test_pubkey_openssh_fingerprint(self):
fp = get_pubkey_openssh_fingerprint(self.PUBKEY)
self.assertEqual(fp, "35:22:13:5b:82:e2:5d:e1:90:8c:73:74:9f:ef:3b:d8")
def test_pubkey_ssh2_fingerprint(self):
fp = get_pubkey_ssh2_fingerprint(self.PUBKEY)
self.assertEqual(fp, "11:ad:5d:4c:5b:99:c9:80:7e:81:03:76:5a:25:9d:8c")
def test_decorator():
@wrap_non_libcloud_exceptions
def foo():
raise Exception("bork")
with pytest.raises(LibcloudError):
foo()
if __name__ == "__main__":
sys.exit(unittest.main())