| # Licensed to the Apache Software Foundation (ASF) under one or moreĀ§ |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import sys |
| import socket |
| import os.path |
| import platform |
| import unittest |
| import warnings |
| from itertools import chain |
| |
| import pytest |
| |
| import libcloud.utils.files |
| from libcloud.utils.py3 import StringIO, b, bchr, urlquote, hexadigits |
| from libcloud.utils.misc import get_driver, set_driver, get_secure_random_string |
| from libcloud.common.types import LibcloudError |
| from libcloud.compute.types import Provider |
| from libcloud.utils.publickey import get_pubkey_ssh2_fingerprint, get_pubkey_openssh_fingerprint |
| from libcloud.utils.decorators import wrap_non_libcloud_exceptions |
| from libcloud.utils.networking import ( |
| is_public_subnet, |
| is_private_subnet, |
| join_ipv4_segments, |
| is_valid_ip_address, |
| increment_ipv4_segments, |
| ) |
| from libcloud.compute.providers import DRIVERS |
| from libcloud.compute.drivers.dummy import DummyNodeDriver |
| from libcloud.storage.drivers.dummy import DummyIterator |
| |
| # In Python > 2.7 DeprecationWarnings are disabled by default |
| warnings.simplefilter("default") |
| |
| |
| WARNINGS_BUFFER = [] |
| |
| from io import FileIO as file |
| |
| |
| def show_warning(msg, cat, fname, lno, file=None, line=None): |
| WARNINGS_BUFFER.append((msg, cat, fname, lno)) |
| |
| |
| original_func = warnings.showwarning |
| |
| |
| class TestUtils(unittest.TestCase): |
| def setUp(self): |
| global WARNINGS_BUFFER |
| WARNINGS_BUFFER = [] |
| |
| def tearDown(self): |
| global WARNINGS_BUFFER |
| WARNINGS_BUFFER = [] |
| warnings.showwarning = original_func |
| |
| @unittest.skipIf(platform.system().lower() == "windows", "Unsupported on Windows") |
| def test_guess_file_mime_type(self): |
| file_path = os.path.abspath(__file__) |
| mimetype, encoding = libcloud.utils.files.guess_file_mime_type(file_path=file_path) |
| |
| self.assertTrue(mimetype.find("python") != -1) |
| |
| def test_get_driver(self): |
| driver = get_driver(drivers=DRIVERS, provider=Provider.DUMMY) |
| self.assertTrue(driver is not None) |
| |
| try: |
| driver = get_driver(drivers=DRIVERS, provider="fooba") |
| except AttributeError: |
| pass |
| else: |
| self.fail("Invalid provider, but an exception was not thrown") |
| |
| def test_get_driver_string_and_enum_notation(self): |
| driver = get_driver(drivers=DRIVERS, provider=Provider.DUMMY) |
| self.assertEqual(driver, DummyNodeDriver) |
| |
| driver = get_driver(drivers=DRIVERS, provider="dummy") |
| self.assertEqual(driver, DummyNodeDriver) |
| |
| driver = get_driver(drivers=DRIVERS, provider="DUMMY") |
| self.assertEqual(driver, DummyNodeDriver) |
| |
| def test_set_driver(self): |
| # Set an existing driver |
| try: |
| driver = set_driver( |
| DRIVERS, |
| Provider.DUMMY, |
| "libcloud.storage.drivers.dummy", |
| "DummyStorageDriver", |
| ) |
| except AttributeError: |
| pass |
| |
| # Register a new driver |
| driver = set_driver( |
| DRIVERS, |
| "testingset", |
| "libcloud.storage.drivers.dummy", |
| "DummyStorageDriver", |
| ) |
| |
| self.assertTrue(driver is not None) |
| |
| # Register it again |
| try: |
| set_driver( |
| DRIVERS, |
| "testingset", |
| "libcloud.storage.drivers.dummy", |
| "DummyStorageDriver", |
| ) |
| except AttributeError: |
| pass |
| |
| # Register an invalid module |
| try: |
| set_driver( |
| DRIVERS, |
| "testingnew", |
| "libcloud.storage.drivers.dummy1", |
| "DummyStorageDriver", |
| ) |
| except ImportError: |
| pass |
| |
| # Register an invalid class |
| try: |
| set_driver( |
| DRIVERS, |
| "testingnew", |
| "libcloud.storage.drivers.dummy", |
| "DummyStorageDriver1", |
| ) |
| except AttributeError: |
| pass |
| |
| def test_deprecated_warning(self): |
| warnings.showwarning = show_warning |
| |
| libcloud.utils.SHOW_DEPRECATION_WARNING = False |
| self.assertEqual(len(WARNINGS_BUFFER), 0) |
| libcloud.utils.deprecated_warning("test_module") |
| self.assertEqual(len(WARNINGS_BUFFER), 0) |
| |
| libcloud.utils.SHOW_DEPRECATION_WARNING = True |
| self.assertEqual(len(WARNINGS_BUFFER), 0) |
| libcloud.utils.deprecated_warning("test_module") |
| self.assertEqual(len(WARNINGS_BUFFER), 1) |
| |
| def test_in_development_warning(self): |
| warnings.showwarning = show_warning |
| |
| libcloud.utils.SHOW_IN_DEVELOPMENT_WARNING = False |
| self.assertEqual(len(WARNINGS_BUFFER), 0) |
| libcloud.utils.in_development_warning("test_module") |
| self.assertEqual(len(WARNINGS_BUFFER), 0) |
| |
| libcloud.utils.SHOW_IN_DEVELOPMENT_WARNING = True |
| self.assertEqual(len(WARNINGS_BUFFER), 0) |
| libcloud.utils.in_development_warning("test_module") |
| self.assertEqual(len(WARNINGS_BUFFER), 1) |
| |
| def test_read_in_chunks_iterator_no_data(self): |
| iterator = DummyIterator() |
| generator1 = libcloud.utils.files.read_in_chunks(iterator=iterator, yield_empty=False) |
| generator2 = libcloud.utils.files.read_in_chunks(iterator=iterator, yield_empty=True) |
| |
| # yield_empty=False |
| count = 0 |
| for data in generator1: |
| count += 1 |
| self.assertEqual(data, b("")) |
| |
| self.assertEqual(count, 0) |
| |
| # yield_empty=True |
| count = 0 |
| for data in generator2: |
| count += 1 |
| self.assertEqual(data, b("")) |
| |
| self.assertEqual(count, 1) |
| |
| def test_read_in_chunks_iterator(self): |
| def iterator(): |
| for x in range(0, 1000): |
| yield "aa" |
| |
| chunk_count = 0 |
| for result in libcloud.utils.files.read_in_chunks( |
| iterator(), chunk_size=10, fill_size=False |
| ): |
| chunk_count += 1 |
| self.assertEqual(result, b("aa")) |
| self.assertEqual(chunk_count, 1000) |
| |
| chunk_count = 0 |
| for result in libcloud.utils.files.read_in_chunks( |
| iterator(), chunk_size=10, fill_size=True |
| ): |
| chunk_count += 1 |
| self.assertEqual(result, b("a") * 10) |
| self.assertEqual(chunk_count, 200) |
| |
| def test_read_in_chunks_large_iterator_batches(self): |
| def iterator(): |
| for x in range(0, 10): |
| yield "a" * 10_000 |
| |
| chunk_count = 0 |
| for result in libcloud.utils.files.read_in_chunks( |
| iterator(), chunk_size=10, fill_size=False |
| ): |
| chunk_count += 1 |
| self.assertEqual(result, b("a") * 10_000) |
| self.assertEqual(chunk_count, 10) |
| |
| chunk_count = 0 |
| for result in libcloud.utils.files.read_in_chunks( |
| iterator(), chunk_size=10, fill_size=True |
| ): |
| chunk_count += 1 |
| self.assertEqual(result, b("a") * 10) |
| self.assertEqual(chunk_count, 10_000) |
| |
| def test_read_in_chunks_filelike(self): |
| class FakeFile(file): |
| def __init__(self): |
| self.remaining = 500 |
| |
| def read(self, size): |
| self.remaining -= 1 |
| if self.remaining == 0: |
| return "" |
| return "b" * (size + 1) |
| |
| for index, result in enumerate( |
| libcloud.utils.files.read_in_chunks(FakeFile(), chunk_size=10, fill_size=False) |
| ): |
| self.assertEqual(result, b("b" * 11)) |
| |
| self.assertEqual(index, 498) |
| |
| for index, result in enumerate( |
| libcloud.utils.files.read_in_chunks(FakeFile(), chunk_size=10, fill_size=True) |
| ): |
| if index != 548: |
| self.assertEqual(result, b("b" * 10)) |
| else: |
| self.assertEqual(result, b("b" * 9)) |
| |
| self.assertEqual(index, 548) |
| |
| def test_exhaust_iterator(self): |
| def iterator_func(): |
| for x in range(0, 1000): |
| yield "aa" |
| |
| data = b("aa" * 1000) |
| iterator = libcloud.utils.files.read_in_chunks(iterator=iterator_func()) |
| result = libcloud.utils.files.exhaust_iterator(iterator=iterator) |
| self.assertEqual(result, data) |
| |
| result = libcloud.utils.files.exhaust_iterator(iterator=iterator_func()) |
| self.assertEqual(result, data) |
| |
| data = "12345678990" |
| iterator = StringIO(data) |
| result = libcloud.utils.files.exhaust_iterator(iterator=iterator) |
| self.assertEqual(result, b(data)) |
| |
| def test_exhaust_iterator_empty_iterator(self): |
| data = "" |
| iterator = StringIO(data) |
| result = libcloud.utils.files.exhaust_iterator(iterator=iterator) |
| self.assertEqual(result, b(data)) |
| |
| def test_unicode_urlquote(self): |
| # Regression tests for LIBCLOUD-429 |
| # Note: this is a unicode literal |
| val = "\xe9" |
| |
| uri = urlquote(val) |
| self.assertEqual(b(uri), b("%C3%A9")) |
| |
| # Unicode without unicode characters |
| uri = urlquote("v=1") |
| self.assertEqual(b(uri), b("v%3D1")) |
| |
| # Already-encoded bytestring without unicode characters |
| uri = urlquote(b("v=1")) |
| self.assertEqual(b(uri), b("v%3D1")) |
| |
| def test_get_secure_random_string(self): |
| for i in range(1, 500): |
| value = get_secure_random_string(size=i) |
| self.assertEqual(len(value), i) |
| |
| def test_hexadigits(self): |
| self.assertEqual(hexadigits(b("")), []) |
| self.assertEqual(hexadigits(b("a")), ["61"]) |
| self.assertEqual(hexadigits(b("AZaz09-")), ["41", "5a", "61", "7a", "30", "39", "2d"]) |
| |
| def test_bchr(self): |
| self.assertEqual(bchr(0), b"\x00") |
| self.assertEqual(bchr(97), b"a") |
| |
| |
| class NetworkingUtilsTestCase(unittest.TestCase): |
| def test_is_public_and_is_private_subnet(self): |
| public_ips = ["213.151.0.8", "86.87.86.1", "8.8.8.8", "8.8.4.4"] |
| |
| private_ips = ["192.168.1.100", "10.0.0.1", "172.16.0.0"] |
| |
| for address in public_ips: |
| is_public = is_public_subnet(ip=address) |
| is_private = is_private_subnet(ip=address) |
| |
| self.assertTrue(is_public) |
| self.assertFalse(is_private) |
| |
| for address in private_ips: |
| is_public = is_public_subnet(ip=address) |
| is_private = is_private_subnet(ip=address) |
| |
| self.assertFalse(is_public) |
| self.assertTrue(is_private) |
| |
| def test_is_valid_ip_address(self): |
| valid_ipv4_addresses = [ |
| "192.168.1.100", |
| "10.0.0.1", |
| "213.151.0.8", |
| "77.77.77.77", |
| ] |
| |
| invalid_ipv4_addresses = [ |
| "10.1", |
| "256.256.256.256", |
| "0.567.567.567", |
| "192.168.0.257", |
| ] |
| |
| valid_ipv6_addresses = [ |
| "fe80::200:5aee:feaa:20a2", |
| "2607:f0d0:1002:51::4", |
| "2607:f0d0:1002:0051:0000:0000:0000:0004", |
| "::1", |
| ] |
| |
| invalid_ipv6_addresses = [ |
| "2607:f0d", |
| "2607:f0d0:0004", |
| ] |
| |
| for address in valid_ipv4_addresses: |
| status = is_valid_ip_address(address=address, family=socket.AF_INET) |
| self.assertTrue(status) |
| |
| for address in valid_ipv6_addresses: |
| status = is_valid_ip_address(address=address, family=socket.AF_INET6) |
| self.assertTrue(status) |
| |
| for address in chain(invalid_ipv4_addresses, invalid_ipv6_addresses): |
| status = is_valid_ip_address(address=address, family=socket.AF_INET) |
| self.assertFalse(status) |
| |
| for address in chain(invalid_ipv4_addresses, invalid_ipv6_addresses): |
| status = is_valid_ip_address(address=address, family=socket.AF_INET6) |
| self.assertFalse(status) |
| |
| def test_join_ipv4_segments(self): |
| values = [ |
| (("127", "0", "0", "1"), "127.0.0.1"), |
| (("255", "255", "255", "0"), "255.255.255.0"), |
| ] |
| |
| for segments, joined_ip in values: |
| result = join_ipv4_segments(segments=segments) |
| self.assertEqual(result, joined_ip) |
| |
| def test_increment_ipv4_segments(self): |
| values = [ |
| (("127", "0", "0", "1"), "127.0.0.2"), |
| (("255", "255", "255", "0"), "255.255.255.1"), |
| (("254", "255", "255", "255"), "255.0.0.0"), |
| (("100", "1", "0", "255"), "100.1.1.0"), |
| ] |
| |
| for segments, incremented_ip in values: |
| result = increment_ipv4_segments(segments=segments) |
| result = join_ipv4_segments(segments=result) |
| self.assertEqual(result, incremented_ip) |
| |
| |
| class TestPublicKeyUtils(unittest.TestCase): |
| |
| PUBKEY = ( |
| "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDOfbWSXOlqvYjZmRO84/lIoV4gvuX+" |
| "P1lLg50MMg6jZjLZIlYY081XPRmuom0xY0+BO++J2KgLl7gxJ6xMsKK2VQ+TakdfAH20" |
| "XfMcTohd/zVCeWsbqZQvEhVXBo4hPIktcfNz0u9Ez3EtInO+kb7raLcRhOVi9QmOkOrC" |
| "WtQU9mS71AWJuqI9H0YAnTiI8Hs5bn2tpMIqmTXT3g2bwywC25x1Nx9Hy0/FP+KUL6Ag" |
| "vDXv47l+TgSDfTBEkvq+IF1ITrnaOG+nRE02oZC6cwHYTifM/IOollkujxIQmi2Z+j66" |
| "OHSrjnEQugr0FqGJF2ygKfIh/i2u3fVLM60qE2NN user@example" |
| ) |
| |
| def test_pubkey_openssh_fingerprint(self): |
| fp = get_pubkey_openssh_fingerprint(self.PUBKEY) |
| self.assertEqual(fp, "35:22:13:5b:82:e2:5d:e1:90:8c:73:74:9f:ef:3b:d8") |
| |
| def test_pubkey_ssh2_fingerprint(self): |
| fp = get_pubkey_ssh2_fingerprint(self.PUBKEY) |
| self.assertEqual(fp, "11:ad:5d:4c:5b:99:c9:80:7e:81:03:76:5a:25:9d:8c") |
| |
| |
| def test_decorator(): |
| @wrap_non_libcloud_exceptions |
| def foo(): |
| raise Exception("bork") |
| |
| with pytest.raises(LibcloudError): |
| foo() |
| |
| |
| if __name__ == "__main__": |
| sys.exit(unittest.main()) |