| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import sys |
| from six import assertRegex |
| |
| from libcloud.utils.py3 import httplib |
| from libcloud.test import MockHttp |
| from libcloud.test.file_fixtures import ComputeFileFixtures |
| from libcloud.compute.types import Provider |
| from libcloud.compute.types import NodeState |
| from libcloud.compute.base import NodeAuthPassword, NodeAuthSSHKey |
| from libcloud.compute.providers import get_driver |
| from libcloud.test import unittest |
| from libcloud.test.secrets import PROFIT_BRICKS_PARAMS |
| from libcloud.common.exceptions import BaseHTTPError |
| |
| |
| class ProfitBricksTests(unittest.TestCase): |
| def setUp(self): |
| ProfitBricks = get_driver(Provider.PROFIT_BRICKS) |
| ProfitBricks.connectionCls.conn_class = ProfitBricksMockHttp |
| self.driver = ProfitBricks(*PROFIT_BRICKS_PARAMS) |
| |
| """ |
| Function tests for listing items |
| """ |
| |
| def test_list_sizes(self): |
| sizes = self.driver.list_sizes() |
| self.assertEqual(len(sizes), 7) |
| |
| def test_list_images(self): |
| |
| """ |
| Fetch all images and then fetch with filters |
| """ |
| all_images = self.driver.list_images() |
| hdd_images = self.driver.list_images("HDD") |
| cdd_images = self.driver.list_images("CDROM") |
| private_images = self.driver.list_images(is_public=False) |
| |
| self.assertEqual(len(all_images), 4) |
| self.assertEqual(len(hdd_images), 2) |
| self.assertEqual(len(cdd_images), 2) |
| self.assertEqual(len(private_images), 2) |
| |
| image = all_images[0] |
| extra = image.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(image.id, "img-1") |
| self.assertEqual(image.name, "Test-Image-Two-CDROM") |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2014-11-14T15:22:19Z") |
| self.assertEqual(extra["created_by"], "System") |
| self.assertEqual(extra["etag"], "957e0eac7456fa7554e73bf0d18860eb") |
| self.assertEqual(extra["last_modified_date"], "2014-11-14T15:22:19Z") |
| self.assertEqual(extra["last_modified_by"], "System") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["name"], "Test-Image-Two-CDROM") |
| self.assertEqual(extra["description"], "") |
| self.assertEqual(extra["location"], "us/las") |
| self.assertEqual(extra["size"], 4) |
| self.assertEqual(extra["cpu_hot_plug"], False) |
| |
| self.assertEqual(extra["cpu_hot_unplug"], False) |
| self.assertEqual(extra["ram_hot_plug"], False) |
| self.assertEqual(extra["ram_hot_unplug"], False) |
| self.assertEqual(extra["nic_hot_plug"], False) |
| self.assertEqual(extra["nic_hot_unplug"], False) |
| |
| self.assertEqual(extra["disc_virtio_hot_plug"], False) |
| self.assertEqual(extra["disc_virtio_hot_unplug"], False) |
| self.assertEqual(extra["disc_scsi_hot_plug"], False) |
| self.assertEqual(extra["disc_scsi_hot_unplug"], False) |
| self.assertEqual(extra["licence_type"], "OTHER") |
| |
| self.assertEqual(extra["image_type"], "CDROM") |
| self.assertEqual(extra["public"], True) |
| self.assertIsInstance(extra["image_aliases"], list) |
| |
| def test_list_locations(self): |
| |
| locations = self.driver.list_locations() |
| |
| self.assertTrue(len(locations) > 0) |
| |
| """ |
| Standard properties |
| """ |
| location = locations[2] |
| self.assertEqual(location.id, "us/las") |
| self.assertEqual(location.name, "lasvegas") |
| self.assertEqual(location.country, "us") |
| |
| def test_list_nodes(self): |
| |
| nodes = self.driver.list_nodes() |
| |
| self.assertEqual(len(nodes), 2) |
| |
| node = nodes[0] |
| extra = node.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(node.id, "srv-1") |
| self.assertEqual(node.name, "libcloud Test") |
| self.assertEqual(node.state, NodeState.RUNNING) |
| self.assertEqual(node.public_ips, []) |
| self.assertEqual(node.private_ips, []) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-18T07:28:05Z") |
| self.assertEqual(extra["created_by"], "test@test.te") |
| self.assertEqual(extra["etag"], "e7cf186125f51f3d9511754a40dcd12c") |
| self.assertEqual(extra["last_modified_date"], "2016-10-18T07:28:05Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.te") |
| self.assertEqual(extra["state"], "AVAILABLE") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["availability_zone"], "AUTO") |
| self.assertEqual(extra["boot_cdrom"], None) |
| self.assertEqual(extra["boot_volume"]["id"], "bvol-1") |
| self.assertEqual( |
| extra["boot_volume"]["href"], |
| ("/cloudapi/v4/datacenters/dc-1" "/volumes/bvol-1"), |
| ) |
| self.assertEqual(extra["boot_volume"]["properties"]["name"], "libcloud Test") |
| self.assertEqual(extra["boot_volume"]["properties"]["type"], "HDD") |
| self.assertEqual(extra["boot_volume"]["properties"]["size"], 2) |
| self.assertEqual(extra["boot_volume"]["properties"]["image"], "bvol-img") |
| self.assertEqual(extra["cpu_family"], "AMD_OPTERON") |
| |
| """ |
| Other miscellaneous |
| """ |
| self.assertEqual(len(extra["entities"]), 3) |
| self.assertNotIn("status_url", extra) |
| |
| def test_ex_list_availability_zones(self): |
| |
| zones = self.driver.ex_list_availability_zones() |
| self.assertEqual(len(zones), 3) |
| |
| zones_sorted = sorted(list(a.name for a in zones)) |
| zones_expected = ["AUTO", "ZONE_1", "ZONE_2"] |
| self.assertEqual(zones_sorted, zones_expected) |
| |
| def test_list_volumes(self): |
| |
| volumes = self.driver.list_volumes() |
| self.assertTrue(len(volumes) > 0) |
| |
| def test_ex_list_datacenters(self): |
| |
| datacenters = self.driver.ex_list_datacenters() |
| self.assertTrue(len(datacenters) > 0) |
| |
| datacenter = datacenters[0] |
| extra = datacenter.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(datacenter.id, "dc-1") |
| self.assertEqual(datacenter.href, "/cloudapi/v4/datacenters/dc-1") |
| self.assertEqual(datacenter.name, "libcloud Test") |
| self.assertEqual(datacenter.version, 3) |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["name"], "libcloud Test") |
| self.assertEqual(extra["description"], "libcloud test datacenter") |
| self.assertEqual(extra["location"], "us/las") |
| self.assertEqual(extra["version"], 3) |
| self.assertEqual(extra["features"], ["SSD", "MULTIPLE_CPU"]) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-14T07:24:59Z") |
| self.assertEqual(extra["created_by"], "test@test.test") |
| self.assertEqual(extra["etag"], "bdddec2287cb7723e86ac088bf644606") |
| self.assertEqual(extra["last_modified_date"], "2016-10-17T15:27:25Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.test") |
| |
| self.assertEqual(extra["state"], "AVAILABLE") |
| self.assertEqual(extra["provisioning_state"], NodeState.RUNNING) |
| self.assertEqual(len(extra["entities"]), 4) |
| self.assertNotIn("status_url", extra) |
| |
| def test_list_snapshots(self): |
| |
| volume_snapshots = self.driver.list_snapshots() |
| self.assertTrue(len(volume_snapshots) > 0) |
| |
| snapshot = volume_snapshots[0] |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(snapshot.id, "sshot") |
| self.assertEqual(snapshot.size, 10) |
| self.assertEqual(snapshot.created, "2016-10-26T11:38:45Z") |
| self.assertEqual(snapshot.state, NodeState.RUNNING) |
| self.assertEqual( |
| snapshot.name, "Balancer Testing 1 Storage-Snapshot-10/26/2016" |
| ) |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual( |
| snapshot.extra["name"], "Balancer Testing 1 Storage-Snapshot-10/26/2016" |
| ) |
| self.assertEqual( |
| snapshot.extra["description"], |
| ('Created from "Balancer Testing 1' ' Storage" in Data Center "Snapshot"'), |
| ) |
| self.assertEqual(snapshot.extra["location"], "us/las") |
| self.assertEqual(snapshot.extra["size"], 10) |
| self.assertEqual(snapshot.extra["cpu_hot_plug"], True) |
| self.assertEqual(snapshot.extra["cpu_hot_unplug"], False) |
| self.assertEqual(snapshot.extra["ram_hot_plug"], True) |
| self.assertEqual(snapshot.extra["ram_hot_unplug"], False) |
| self.assertEqual(snapshot.extra["nic_hot_plug"], True) |
| self.assertEqual(snapshot.extra["nic_hot_unplug"], True) |
| self.assertEqual(snapshot.extra["disc_virtio_hot_plug"], True) |
| self.assertEqual(snapshot.extra["disc_virtio_hot_unplug"], True) |
| self.assertEqual(snapshot.extra["disc_scsi_hot_plug"], False) |
| self.assertEqual(snapshot.extra["disc_scsi_hot_unplug"], False) |
| self.assertEqual(snapshot.extra["licence_type"], "LINUX") |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(snapshot.extra["created_date"], "2016-10-26T11:38:45Z") |
| self.assertEqual(snapshot.extra["created_by"], "test@test.te") |
| self.assertEqual(snapshot.extra["etag"], "01873262ac042b5f44ed33b4241225b4") |
| self.assertEqual(snapshot.extra["last_modified_date"], "2016-10-26T11:38:45Z") |
| self.assertEqual(snapshot.extra["last_modified_by"], "test@test.te") |
| self.assertEqual(snapshot.extra["state"], "AVAILABLE") |
| |
| """ |
| Function tests for operations on volume snapshots |
| """ |
| |
| def test_create_volume_snapshot(self): |
| volume = self.driver.ex_describe_volume( |
| ("/cloudapi/v4/datacenters/" "dc-1/" "volumes/vol-2") |
| ) |
| snapshot = self.driver.create_volume_snapshot(volume=volume) |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(snapshot.id, "sshot") |
| self.assertEqual(snapshot.size, 10) |
| self.assertEqual(snapshot.created, "2016-10-26T11:38:45Z") |
| self.assertEqual(snapshot.state, NodeState.PENDING) |
| self.assertEqual(snapshot.name, "libcloud Test") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(snapshot.extra["name"], "libcloud Test") |
| self.assertEqual(snapshot.extra["description"], "libcloud test snapshot") |
| self.assertEqual(snapshot.extra["location"], "us/las") |
| self.assertEqual(snapshot.extra["size"], 10) |
| self.assertEqual(snapshot.extra["cpu_hot_plug"], True) |
| self.assertEqual(snapshot.extra["cpu_hot_unplug"], False) |
| self.assertEqual(snapshot.extra["ram_hot_plug"], True) |
| self.assertEqual(snapshot.extra["ram_hot_unplug"], False) |
| self.assertEqual(snapshot.extra["nic_hot_plug"], True) |
| self.assertEqual(snapshot.extra["nic_hot_unplug"], True) |
| self.assertEqual(snapshot.extra["disc_virtio_hot_plug"], True) |
| self.assertEqual(snapshot.extra["disc_virtio_hot_unplug"], True) |
| |
| self.assertEqual(snapshot.extra["disc_scsi_hot_plug"], False) |
| self.assertEqual(snapshot.extra["disc_scsi_hot_unplug"], False) |
| self.assertEqual(snapshot.extra["licence_type"], "LINUX") |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(snapshot.extra["created_date"], "2016-10-26T11:38:45Z") |
| self.assertEqual(snapshot.extra["created_by"], "test@test.te") |
| self.assertEqual(snapshot.extra["etag"], "01873262ac042b5f44ed33b4241225b4") |
| self.assertEqual(snapshot.extra["last_modified_date"], "2016-10-26T11:38:45Z") |
| self.assertEqual(snapshot.extra["last_modified_by"], "test@test.te") |
| self.assertEqual(snapshot.extra["state"], "BUSY") |
| |
| def test_create_volume_snapshot_failure(self): |
| with self.assertRaises(AttributeError): |
| "Raises attribute error if no volume" |
| self.driver.create_volume_snapshot(volume=None) |
| |
| def test_ex_describe_snapshot(self): |
| snapshot_w_href = self.driver.ex_describe_snapshot( |
| ex_href="/cloudapi/v4/snapshots/sshot" |
| ) |
| snapshot_w_id = self.driver.ex_describe_snapshot(ex_snapshot_id="sshot") |
| self._verify_snapshot(snapshot=snapshot_w_href) |
| self._verify_snapshot(snapshot=snapshot_w_id) |
| |
| def test_ex_describe_snapshot_failure(self): |
| with self.assertRaises(BaseHTTPError) as cm: |
| self.driver.ex_describe_snapshot(ex_snapshot_id="00000000") |
| self.assertIn("Resource does not exist", cm.exception.message.value) |
| |
| def _verify_snapshot(self, snapshot): |
| """ |
| Standard properties |
| """ |
| self.assertEqual(snapshot.id, "sshot") |
| self.assertEqual(snapshot.size, 10) |
| self.assertEqual(snapshot.created, "2016-10-26T11:38:45Z") |
| self.assertEqual(snapshot.state, NodeState.RUNNING) |
| self.assertEqual(snapshot.name, "libcloud Test") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(snapshot.extra["name"], "libcloud Test") |
| self.assertEqual(snapshot.extra["description"], "libcloud test snapshot") |
| self.assertEqual(snapshot.extra["location"], "us/las") |
| self.assertEqual(snapshot.extra["size"], 10) |
| self.assertEqual(snapshot.extra["cpu_hot_plug"], True) |
| self.assertEqual(snapshot.extra["cpu_hot_unplug"], False) |
| self.assertEqual(snapshot.extra["ram_hot_plug"], True) |
| self.assertEqual(snapshot.extra["ram_hot_unplug"], False) |
| self.assertEqual(snapshot.extra["nic_hot_plug"], True) |
| self.assertEqual(snapshot.extra["nic_hot_unplug"], True) |
| self.assertEqual(snapshot.extra["disc_virtio_hot_plug"], True) |
| self.assertEqual(snapshot.extra["disc_virtio_hot_unplug"], True) |
| self.assertEqual(snapshot.extra["disc_scsi_hot_plug"], False) |
| self.assertEqual(snapshot.extra["disc_scsi_hot_unplug"], False) |
| self.assertEqual(snapshot.extra["licence_type"], "LINUX") |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(snapshot.extra["created_date"], "2016-10-26T11:38:45Z") |
| self.assertEqual(snapshot.extra["created_by"], "test@test.te") |
| self.assertEqual(snapshot.extra["etag"], "01873262ac042b5f44ed33b4241225b4") |
| self.assertEqual(snapshot.extra["last_modified_date"], "2016-10-26T11:38:45Z") |
| self.assertEqual(snapshot.extra["last_modified_by"], "test@test.te") |
| self.assertEqual(snapshot.extra["state"], "AVAILABLE") |
| |
| def test_ex_update_snapshot(self): |
| snapshot = self.driver.ex_describe_snapshot( |
| ex_href="/cloudapi/v4/snapshots/sshot" |
| ) |
| updated = self.driver.ex_update_snapshot( |
| snapshot=snapshot, |
| name="libcloud Test - RENAME", |
| description="libcloud test snapshot - RENAME", |
| ) |
| |
| self.assertEqual(updated.name, "libcloud Test - RENAME") |
| self.assertEqual( |
| updated.extra["description"], "libcloud test snapshot - RENAME" |
| ) |
| |
| def test_restore_volume_snapshot(self): |
| volume = self.driver.ex_describe_volume( |
| ex_datacenter_id="dc-1", ex_volume_id="vol-2" |
| ) |
| snapshot = self.driver.ex_describe_snapshot(ex_snapshot_id="sshot") |
| restored = self.driver.ex_restore_volume_snapshot( |
| volume=volume, snapshot=snapshot |
| ) |
| self.assertTrue(restored) |
| |
| def test_destroy_volume_snapshot(self): |
| snapshot = self.driver.ex_describe_snapshot( |
| ex_href="/cloudapi/v4/snapshots/sshot" |
| ) |
| destroyed = self.driver.destroy_volume_snapshot(snapshot) |
| self.assertTrue(destroyed) |
| |
| """ |
| Function tests for operations on nodes (servers) |
| """ |
| |
| def test_reboot_node(self): |
| node = self.driver.ex_describe_node( |
| ex_href=("/cloudapi/v4/datacenters/dc-1" "/servers/srv-1") |
| ) |
| rebooted = self.driver.reboot_node(node=node) |
| self.assertTrue(rebooted) |
| |
| def test_create_node(self): |
| image = self.driver.ex_describe_image(ex_href="/cloudapi/v4/images/img-2") |
| datacenter = self.driver.ex_describe_datacenter( |
| ex_href="/cloudapi/v4/datacenters/dc-1" |
| ) |
| |
| ssh_key = NodeAuthSSHKey("ssh-rsa AAAAB3NzaC1") |
| password = NodeAuthPassword("secretpassword1233") |
| |
| node = self.driver.create_node( |
| name="libcloud Test", |
| image=image, |
| ex_ram=1024, |
| ex_cores=1, |
| ex_disk=2, |
| ex_bus_type="VIRTIO", |
| ex_disk_type="HDD", |
| ex_cpu_family="INTEL_XEON", |
| ex_password=password, |
| ex_ssh_keys=[ssh_key], |
| ex_datacenter=datacenter, |
| ) |
| |
| extra = node.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(node.id, "srv-2") |
| self.assertEqual(node.name, "libcloud Test") |
| self.assertEqual(node.state, NodeState.UNKNOWN) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-19T13:25:19Z") |
| self.assertEqual(extra["created_by"], "test@test.te") |
| self.assertEqual(extra["etag"], "9bea2412ac556b402a07260fc0d1603f") |
| self.assertEqual(extra["last_modified_date"], "2016-10-19T13:25:19Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.te") |
| self.assertEqual(extra["state"], "BUSY") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["name"], "libcloud Test") |
| self.assertEqual(extra["cores"], 1) |
| self.assertEqual(extra["ram"], 1024) |
| self.assertEqual(extra["availability_zone"], "ZONE_1") |
| self.assertEqual(extra["vm_state"], None) |
| |
| self.assertEqual(extra["boot_cdrom"], None) |
| self.assertEqual(extra["boot_volume"], None) |
| self.assertEqual(extra["cpu_family"], "INTEL_XEON") |
| |
| """ |
| Extra entities |
| """ |
| self.assertEqual(len(extra["entities"]["volumes"]["items"]), 1) |
| |
| def test_create_node_failure(self): |
| image = self.driver.ex_describe_image(ex_href="/cloudapi/v4/images/img-2") |
| datacenter = self.driver.ex_describe_datacenter( |
| ex_href="/cloudapi/v4/datacenters/dc-1" |
| ) |
| sizes = self.driver.list_sizes() |
| |
| with self.assertRaises(ValueError): |
| "Raises value error if no size or ex_ram" |
| self.driver.create_node( |
| name="libcloud Test", image=image, ex_disk=40, ex_cores=1 |
| ) |
| |
| with self.assertRaises(ValueError): |
| "Raises value error if no size or ex_cores" |
| self.driver.create_node( |
| name="libcloud Test", image=image, ex_disk=40, ex_ram=1024 |
| ) |
| |
| with self.assertRaises(ValueError): |
| "Raises value error if no size or ex_disk" |
| self.driver.create_node( |
| name="libcloud Test", image=image, ex_cores=2, ex_ram=1024 |
| ) |
| |
| with self.assertRaises(ValueError): |
| "Raises value error if no ssh keys or password" |
| self.driver.create_node( |
| name="libcloud Test", |
| image=image, |
| size=sizes[1], |
| ex_datacenter=datacenter, |
| ) |
| |
| def test_destroy_node(self): |
| node = self.driver.ex_describe_node( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1/" "servers/srv-1") |
| ) |
| destroyed = self.driver.destroy_node(node=node) |
| self.assertTrue(destroyed) |
| |
| def test_ex_list_attached_volumes(self): |
| |
| node = self.driver.ex_describe_node( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1/servers/" "srv-1") |
| ) |
| |
| attached_volumes = self.driver.ex_list_attached_volumes(node) |
| self.assertTrue(len(attached_volumes) > 0) |
| |
| def test_attach_volume(self): |
| |
| node = self.driver.ex_describe_node( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1/" "servers/srv-1") |
| ) |
| volume = self.driver.ex_describe_volume( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1/" "volumes/vol-2") |
| ) |
| |
| attached = self.driver.attach_volume(node=node, volume=volume) |
| extra = attached.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(attached.id, "vol-2") |
| self.assertEqual(attached.name, "libcloud Test") |
| self.assertEqual(attached.size, 2) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-17T13:13:36Z") |
| self.assertEqual(extra["created_by"], "test@test.te") |
| self.assertEqual(extra["etag"], "c1800ce349033f9cd2c095ea1ea4976a") |
| self.assertEqual(extra["last_modified_date"], "2016-10-17T13:47:52Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.te") |
| self.assertEqual(extra["state"], "BUSY") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["name"], "libcloud Test") |
| self.assertEqual(extra["type"], "HDD") |
| self.assertEqual(extra["size"], 2) |
| self.assertEqual(extra["image"], "bvol-img") |
| |
| self.assertEqual(extra["image_password"], None) |
| self.assertEqual(extra["ssh_keys"], None) |
| self.assertEqual(extra["bus"], "VIRTIO") |
| self.assertEqual(extra["licence_type"], "UNKNOWN") |
| self.assertEqual(extra["cpu_hot_plug"], True) |
| |
| self.assertEqual(extra["cpu_hot_unplug"], False) |
| self.assertEqual(extra["ram_hot_plug"], True) |
| self.assertEqual(extra["ram_hot_unplug"], False) |
| self.assertEqual(extra["nic_hot_plug"], True) |
| self.assertEqual(extra["nic_hot_unplug"], True) |
| |
| self.assertEqual(extra["disc_virtio_hot_plug"], True) |
| self.assertEqual(extra["disc_virtio_hot_unplug"], True) |
| self.assertEqual(extra["disc_scsi_hot_plug"], False) |
| self.assertEqual(extra["disc_scsi_hot_unplug"], False) |
| self.assertEqual(extra["device_number"], 2) |
| |
| self.assertNotIn("availability_zone", extra) |
| |
| def test_detach_volume(self): |
| |
| node = self.driver.ex_describe_node( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1/" "servers/srv-1") |
| ) |
| volume = self.driver.ex_describe_volume( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1/" "volumes/vol-2") |
| ) |
| detached = self.driver.detach_volume(node=node, volume=volume) |
| self.assertTrue(detached) |
| |
| def test_ex_stop_node(self): |
| |
| node = self.driver.ex_describe_node( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1/" "servers/srv-1") |
| ) |
| stopped = self.driver.ex_stop_node(node) |
| self.assertTrue(stopped) |
| |
| def test_ex_start_node(self): |
| node = self.driver.ex_describe_node( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1/" "servers/srv-1") |
| ) |
| started = self.driver.ex_start_node(node) |
| self.assertTrue(started) |
| |
| def test_ex_describe_node(self): |
| node_w_href = self.driver.ex_describe_node( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1/" "servers/srv-1") |
| ) |
| node_w_id = self.driver.ex_describe_node( |
| ex_datacenter_id="dc-1", ex_node_id="srv-1" |
| ) |
| self._verify_node(node=node_w_href) |
| self._verify_node(node=node_w_id) |
| |
| def test_ex_describe_node_failure(self): |
| with self.assertRaises(BaseHTTPError) as cm: |
| self.driver.ex_describe_node(ex_datacenter_id="dc-1", ex_node_id="00000000") |
| self.assertIn("Resource does not exist", cm.exception.message.value) |
| |
| def _verify_node(self, node): |
| extra = node.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(node.id, "srv-1") |
| self.assertEqual(node.name, "libcloud Test") |
| self.assertEqual(node.state, NodeState.RUNNING) |
| self.assertEqual(node.public_ips, []) |
| self.assertEqual(node.private_ips, []) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-18T07:28:05Z") |
| self.assertEqual(extra["created_by"], "test@test.test") |
| self.assertEqual(extra["etag"], "e7cf186125f51f3d9511754a40dcd12c") |
| self.assertEqual(extra["last_modified_date"], "2016-10-18T07:28:05Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.test") |
| self.assertEqual(extra["state"], "AVAILABLE") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["availability_zone"], "ZONE_1") |
| self.assertEqual(extra["boot_cdrom"], None) |
| self.assertEqual(extra["boot_volume"]["id"], "bvol-1") |
| self.assertEqual( |
| extra["boot_volume"]["href"], |
| ("/cloudapi/v4/datacenters/" "dc-1/" "volumes/bvol-1"), |
| ) |
| self.assertEqual(extra["boot_volume"]["properties"]["name"], "libcloud Test") |
| |
| self.assertEqual(extra["boot_volume"]["properties"]["type"], "HDD") |
| self.assertEqual(extra["boot_volume"]["properties"]["size"], 2) |
| self.assertEqual(extra["boot_volume"]["properties"]["image"], "bvol-img") |
| self.assertEqual(extra["cpu_family"], "AMD_OPTERON") |
| |
| """ |
| Other miscellaneous |
| """ |
| self.assertEqual(len(extra["entities"]), 3) |
| self.assertNotIn("status_url", extra) |
| |
| def test_ex_update_node(self): |
| |
| node = self.driver.ex_describe_node( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1/" "servers/srv-1") |
| ) |
| updated = self.driver.ex_update_node(node=node, name="libcloud Test RENAME") |
| |
| self.assertEqual(updated.id, "srv-1") |
| self.assertEqual(updated.name, "libcloud Test RENAME") |
| |
| """ |
| Function tests for operations on volumes |
| """ |
| |
| def test_create_volume(self): |
| |
| datacenter = self.driver.ex_describe_datacenter( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1") |
| ) |
| image = self.driver.ex_describe_image(ex_href="/cloudapi/v4/images/img-2") |
| created = self.driver.create_volume( |
| size=30, |
| name="Test volume", |
| ex_type="HDD", |
| ex_datacenter=datacenter, |
| image=image, |
| ex_ssh_keys=[NodeAuthSSHKey("ssh-rsa AAAAB3NzaC1")], |
| ) |
| |
| self.assertTrue(created) |
| |
| def test_create_volume_failure(self): |
| datacenter = self.driver.ex_describe_datacenter( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1") |
| ) |
| with self.assertRaises(ValueError): |
| "Raises value error if no size" |
| self.driver.create_volume( |
| size=30, |
| name="libcloud Test", |
| ex_type="HDD", |
| ex_bus_type="IDE", |
| ex_datacenter=datacenter, |
| image=None, |
| ) |
| |
| def test_destroy_volume(self): |
| |
| volume = self.driver.ex_describe_volume( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1/volumes/" "vol-2") |
| ) |
| destroyed = self.driver.destroy_volume(volume=volume) |
| |
| self.assertTrue(destroyed) |
| |
| def test_ex_update_volume(self): |
| |
| volume = self.driver.ex_describe_volume( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1/" "volumes/vol-2") |
| ) |
| updated = self.driver.ex_update_volume( |
| volume=volume, |
| ex_storage_name="Updated volume", |
| size=48, |
| ex_bus_type="VIRTIO", |
| ) |
| |
| extra = updated.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(updated.id, "vol-2") |
| self.assertEqual(updated.name, "libcloud Test - RENAME") |
| self.assertEqual(updated.size, 5) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-17T13:13:36Z") |
| self.assertEqual(extra["created_by"], "test@test.te") |
| self.assertEqual(extra["etag"], "c1800ce349033f9cd2c095ea1ea4976a") |
| self.assertEqual(extra["last_modified_date"], "2016-10-17T13:47:52Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.te") |
| self.assertEqual(extra["state"], "AVAILABLE") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["name"], "libcloud Test - RENAME") |
| self.assertEqual(extra["type"], "HDD") |
| self.assertEqual(extra["size"], 5) |
| self.assertEqual(extra["availability_zone"], "ZONE_3") |
| self.assertEqual(extra["image"], "bvol-img") |
| |
| self.assertEqual(extra["image_password"], None) |
| self.assertEqual(extra["ssh_keys"], None) |
| self.assertEqual(extra["bus"], "VIRTIO") |
| self.assertEqual(extra["licence_type"], "UNKNOWN") |
| self.assertEqual(extra["cpu_hot_plug"], True) |
| |
| self.assertEqual(extra["cpu_hot_unplug"], False) |
| self.assertEqual(extra["ram_hot_plug"], True) |
| self.assertEqual(extra["ram_hot_unplug"], False) |
| self.assertEqual(extra["nic_hot_plug"], True) |
| self.assertEqual(extra["nic_hot_unplug"], True) |
| |
| self.assertEqual(extra["disc_virtio_hot_plug"], True) |
| self.assertEqual(extra["disc_virtio_hot_unplug"], True) |
| self.assertEqual(extra["disc_scsi_hot_plug"], False) |
| self.assertEqual(extra["disc_scsi_hot_unplug"], False) |
| self.assertEqual(extra["device_number"], 3) |
| |
| return {} |
| |
| def test_ex_describe_volume(self): |
| volume_w_href = self.driver.ex_describe_volume( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1/" "volumes/vol-2") |
| ) |
| volume_w_id = self.driver.ex_describe_volume( |
| ex_datacenter_id="dc-1", ex_volume_id="vol-2" |
| ) |
| self._verify_volume(volume=volume_w_href) |
| self._verify_volume(volume=volume_w_id) |
| |
| def test_ex_describe_volume_failure(self): |
| with self.assertRaises(BaseHTTPError) as cm: |
| self.driver.ex_describe_volume( |
| ex_datacenter_id="dc-1", ex_volume_id="00000000" |
| ) |
| self.assertIn("Resource does not exist", cm.exception.message.value) |
| |
| def _verify_volume(self, volume): |
| extra = volume.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(volume.id, "vol-2") |
| self.assertEqual(volume.name, "libcloud Test") |
| self.assertEqual(volume.size, 2) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-17T13:13:36Z") |
| self.assertEqual(extra["created_by"], "test@test.te") |
| self.assertEqual(extra["etag"], "c1800ce349033f9cd2c095ea1ea4976a") |
| self.assertEqual(extra["last_modified_date"], "2016-10-17T13:47:52Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.te") |
| self.assertEqual(extra["state"], "AVAILABLE") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["name"], "libcloud Test") |
| self.assertEqual(extra["type"], "HDD") |
| self.assertEqual(extra["size"], 2) |
| self.assertEqual(extra["availability_zone"], "ZONE_3") |
| self.assertEqual(extra["image"], "bvol-img") |
| |
| self.assertEqual(extra["image_password"], None) |
| self.assertEqual(extra["ssh_keys"], None) |
| self.assertEqual(extra["bus"], "VIRTIO") |
| self.assertEqual(extra["licence_type"], "UNKNOWN") |
| self.assertEqual(extra["cpu_hot_plug"], True) |
| |
| self.assertEqual(extra["cpu_hot_unplug"], False) |
| self.assertEqual(extra["ram_hot_plug"], True) |
| self.assertEqual(extra["ram_hot_unplug"], False) |
| self.assertEqual(extra["nic_hot_plug"], True) |
| self.assertEqual(extra["nic_hot_unplug"], True) |
| |
| self.assertEqual(extra["disc_virtio_hot_plug"], True) |
| self.assertEqual(extra["disc_virtio_hot_unplug"], True) |
| self.assertEqual(extra["disc_scsi_hot_plug"], False) |
| self.assertEqual(extra["disc_scsi_hot_unplug"], False) |
| self.assertEqual(extra["device_number"], 3) |
| |
| self.assertNotIn("status_url", extra) |
| |
| """ |
| Function tests for operations on data centers |
| """ |
| |
| def test_ex_create_datacenter(self): |
| location = self.driver.ex_describe_location(ex_location_id="us/las") |
| datacenter = self.driver.ex_create_datacenter( |
| name="libcloud Test", |
| location=location, |
| description="libcloud test datacenter", |
| ) |
| |
| extra = datacenter.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(datacenter.id, "dc-1") |
| self.assertEqual(datacenter.href, "/cloudapi/v4/datacenters/dc-1") |
| self.assertEqual(datacenter.name, "libcloud Test") |
| self.assertEqual(datacenter.version, None) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-18T17:20:56Z") |
| self.assertEqual(extra["created_by"], "test@test.te") |
| self.assertEqual(extra["etag"], "c2d3d4d9bbdc0fff7d3c5c3864a68a46") |
| self.assertEqual(extra["last_modified_date"], "2016-10-18T17:20:56Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.te") |
| self.assertEqual(extra["state"], "BUSY") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["name"], "libcloud Test") |
| self.assertEqual(extra["description"], "libcloud test datacenter") |
| self.assertEqual(extra["location"], "us/las") |
| self.assertEqual(extra["version"], None) |
| self.assertEqual(extra["features"], []) |
| |
| """ |
| Miscellaneous properties |
| """ |
| self.assertNotIn("entities", extra) |
| self.assertEqual(extra["provisioning_state"], NodeState.PENDING) |
| |
| def test_ex_create_datacenter_failure(self): |
| with self.assertRaises(AttributeError): |
| "Raises attribute error if no location" |
| self.driver.ex_create_datacenter( |
| name="libcloud Test", |
| location=None, |
| description="libcloud test datacenter", |
| ) |
| |
| def test_ex_destroy_datacenter(self): |
| |
| datacenter = self.driver.ex_describe_datacenter( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1") |
| ) |
| destroyed = self.driver.ex_destroy_datacenter(datacenter=datacenter) |
| self.assertTrue(destroyed) |
| |
| def test_ex_describe_datacenter(self): |
| datacenter_w_href = self.driver.ex_describe_datacenter( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1") |
| ) |
| datacenter_w_id = self.driver.ex_describe_datacenter(ex_datacenter_id="dc-1") |
| self._verify_datacenter(datacenter=datacenter_w_href) |
| self._verify_datacenter(datacenter=datacenter_w_id) |
| |
| def test_ex_describe_datacenter_failure(self): |
| with self.assertRaises(BaseHTTPError) as cm: |
| self.driver.ex_describe_datacenter(ex_datacenter_id="00000000") |
| self.assertIn("Resource does not exist", cm.exception.message.value) |
| |
| def _verify_datacenter(self, datacenter): |
| extra = datacenter.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(datacenter.id, "dc-1") |
| self.assertEqual(datacenter.href, "/cloudapi/v4/datacenters/dc-1") |
| self.assertEqual(datacenter.name, "libcloud Test") |
| self.assertEqual(datacenter.version, 35) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-17T11:33:11Z") |
| self.assertEqual(extra["created_by"], "test@test.test") |
| self.assertEqual(extra["etag"], "53b215b8ec0356a649955dab019845a4") |
| self.assertEqual(extra["last_modified_date"], "2016-10-18T15:13:44Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.test") |
| self.assertEqual(extra["state"], "AVAILABLE") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["name"], "libcloud Test") |
| self.assertEqual(extra["description"], "libcloud test datacenter") |
| self.assertEqual(extra["location"], "us/las") |
| self.assertEqual(extra["version"], 35) |
| self.assertEqual(extra["features"], ["SSD", "MULTIPLE_CPU"]) |
| |
| self.assertNotIn("status_url", extra) |
| self.assertEqual(extra["provisioning_state"], NodeState.RUNNING) |
| self.assertEqual(len(extra["entities"]), 4) |
| |
| def test_ex_rename_datacenter(self): |
| |
| datacenter = self.driver.ex_describe_datacenter( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1") |
| ) |
| renamed = self.driver.ex_rename_datacenter( |
| datacenter=datacenter, name="libcloud Test - RENAME" |
| ) |
| extra = renamed.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(renamed.id, "dc-1") |
| self.assertEqual(renamed.href, "/cloudapi/v4/datacenters/dc-1") |
| self.assertEqual(renamed.name, "libcloud Test - RENAME") |
| self.assertEqual(renamed.version, 35) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-17T11:33:11Z") |
| self.assertEqual(extra["created_by"], "test@test.test") |
| self.assertEqual(extra["etag"], "53b215b8ec0356a649955dab019845a4") |
| self.assertEqual(extra["last_modified_date"], "2016-10-18T15:13:44Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.test") |
| self.assertEqual(extra["state"], "BUSY") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["name"], "libcloud Test - RENAME") |
| self.assertEqual(extra["description"], "libcloud test datacenter") |
| self.assertEqual(extra["location"], "us/las") |
| self.assertEqual(extra["version"], 35) |
| self.assertEqual(extra["features"], ["SSD", "MULTIPLE_CPU"]) |
| |
| self.assertNotIn("status_url", extra) |
| self.assertEqual(extra["provisioning_state"], NodeState.PENDING) |
| self.assertEqual(len(extra["entities"]), 4) |
| |
| """ |
| Function tests for operations on images |
| """ |
| |
| def test_ex_describe_image(self): |
| image_w_href = self.driver.ex_describe_image( |
| ex_href=("/cloudapi/v4/images/" "img-2") |
| ) |
| image_w_id = self.driver.ex_describe_image(ex_image_id="img-2") |
| self._verify_image(image=image_w_href) |
| self._verify_image(image=image_w_id) |
| |
| def test_ex_describe_image_failure(self): |
| with self.assertRaises(BaseHTTPError) as cm: |
| self.driver.ex_describe_image(ex_image_id="00000000") |
| self.assertIn("Resource does not exist", cm.exception.message.value) |
| |
| def _verify_image(self, image): |
| extra = image.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(image.id, "img-2") |
| self.assertEqual(image.name, "vivid-server-cloudimg-amd64-disk1.img") |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2015-10-09T12:06:34Z") |
| self.assertEqual(extra["created_by"], "test@test.te") |
| self.assertEqual(extra["etag"], "bbf76112358af2fc5dd1bf21de8988db") |
| self.assertEqual(extra["last_modified_date"], "2015-11-11T15:23:20Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.te") |
| self.assertEqual(extra["state"], "AVAILABLE") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["name"], "vivid-server-cloudimg-amd64-disk1.img") |
| self.assertEqual(extra["description"], None) |
| self.assertEqual(extra["location"], "us/las") |
| self.assertEqual(extra["size"], 2) |
| self.assertEqual(extra["cpu_hot_plug"], False) |
| |
| self.assertEqual(extra["cpu_hot_unplug"], False) |
| self.assertEqual(extra["ram_hot_plug"], False) |
| self.assertEqual(extra["ram_hot_unplug"], False) |
| self.assertEqual(extra["nic_hot_plug"], False) |
| self.assertEqual(extra["nic_hot_unplug"], False) |
| |
| self.assertEqual(extra["disc_virtio_hot_plug"], False) |
| self.assertEqual(extra["disc_virtio_hot_unplug"], False) |
| self.assertEqual(extra["disc_scsi_hot_plug"], False) |
| self.assertEqual(extra["disc_scsi_hot_unplug"], False) |
| self.assertEqual(extra["licence_type"], "UNKNOWN") |
| |
| self.assertEqual(extra["image_type"], "HDD") |
| self.assertEqual(extra["public"], False) |
| self.assertEqual(extra["href"], "/cloudapi/v4/images/img-2") |
| |
| self.assertIsInstance(extra["image_aliases"], list) |
| |
| def test_ex_update_image(self): |
| image = self.driver.ex_describe_image(ex_href=("/cloudapi/v4/images/" "img-2")) |
| updated = self.driver.ex_update_image(image=image, name="my-updated-image.img") |
| extra = updated.extra |
| |
| self.assertEqual(updated.name, "my-updated-image.img") |
| self.assertEqual(extra["last_modified_date"], "2016-11-11T15:23:20Z") |
| |
| def test_ex_delete_image(self): |
| image = self.driver.ex_describe_image(ex_href=("/cloudapi/v4/images/" "img-2")) |
| deleted = self.driver.ex_delete_image(image) |
| self.assertTrue(deleted) |
| |
| """ |
| Function tests for operations on network interfaces |
| """ |
| |
| def test_ex_list_network_interfaces(self): |
| |
| network_interfaces = self.driver.ex_list_network_interfaces() |
| self.assertTrue(len(network_interfaces) > 0) |
| |
| network_interface = network_interfaces[0] |
| extra = network_interface.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(network_interface.id, "nic-1") |
| self.assertEqual(network_interface.name, "libcloud Test") |
| self.assertEqual( |
| network_interface.href, |
| ("/cloudapi/v4/datacenters/" "dc-1/servers/" "s-3/nics/" "nic-1"), |
| ) |
| self.assertEqual(network_interface.state, NodeState.RUNNING) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-17T15:46:38Z") |
| self.assertEqual(extra["created_by"], "test@test.te") |
| self.assertEqual(extra["etag"], "dbd8216137cf0ec9951170f93fa8fa53") |
| self.assertEqual(extra["last_modified_date"], "2016-10-17T18:19:43Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.te") |
| self.assertEqual(extra["state"], "AVAILABLE") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["name"], "libcloud Test") |
| self.assertEqual(extra["mac"], "02:01:0b:9d:4d:ce") |
| self.assertEqual(extra["ips"], ["10.15.124.11"]) |
| self.assertEqual(extra["dhcp"], False) |
| self.assertEqual(extra["lan"], 2) |
| self.assertEqual(extra["firewall_active"], True) |
| self.assertEqual(extra["nat"], False) |
| |
| def test_ex_describe_network_interface(self): |
| nic_w_href = self.driver.ex_describe_network_interface( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1/" "servers/s-3" "/nics/nic-2") |
| ) |
| nic_w_id = self.driver.ex_describe_network_interface( |
| ex_datacenter_id="dc-1", ex_server_id="s-3", ex_nic_id="nic-2" |
| ) |
| self._verify_network_interface(network_interface=nic_w_href) |
| self._verify_network_interface(network_interface=nic_w_id) |
| |
| def test_ex_describe_network_interface_failure(self): |
| with self.assertRaises(BaseHTTPError) as cm: |
| self.driver.ex_describe_network_interface( |
| ex_datacenter_id="dc-1", ex_server_id="s-3", ex_nic_id="00000000" |
| ) |
| self.assertIn("Resource does not exist", cm.exception.message.value) |
| |
| def _verify_network_interface(self, network_interface): |
| extra = network_interface.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(network_interface.id, "nic-2") |
| self.assertEqual(network_interface.name, "libcloud Test") |
| self.assertEqual( |
| network_interface.href, |
| ("/cloudapi/v4/datacenters/" "dc-1/" "servers/s-3/" "nics/nic-2"), |
| ) |
| self.assertEqual(network_interface.state, NodeState.RUNNING) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-17T15:46:38Z") |
| self.assertEqual(extra["created_by"], "test@test.te") |
| self.assertEqual(extra["etag"], "dbd8216137cf0ec9951170f93fa8fa53") |
| self.assertEqual(extra["last_modified_date"], "2016-10-17T18:19:43Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.te") |
| self.assertEqual(extra["state"], "AVAILABLE") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["name"], "libcloud Test") |
| self.assertEqual(extra["mac"], "02:01:0b:9d:4d:ce") |
| self.assertEqual(extra["ips"], ["10.15.124.11"]) |
| self.assertEqual(extra["dhcp"], True) |
| self.assertEqual(extra["lan"], 2) |
| self.assertEqual(extra["firewall_active"], True) |
| self.assertEqual(extra["nat"], False) |
| |
| """ |
| Miscellaneous |
| """ |
| self.assertTrue(len(extra["entities"]), 1) |
| |
| def test_ex_create_network_interface(self): |
| |
| node = self.driver.ex_describe_node( |
| ("/cloudapi/v4/datacenters/" "dc-1/" "servers/srv-1") |
| ) |
| network_interface = self.driver.ex_create_network_interface( |
| node=node, lan_id=1, dhcp_active=True, nic_name="libcloud Test" |
| ) |
| extra = network_interface.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(network_interface.id, "nic-2") |
| self.assertEqual(network_interface.name, "libcloud Test") |
| self.assertEqual( |
| network_interface.href, |
| ("/cloudapi/v4/datacenters/" "dc-1/" "servers/srv-1" "/nics/nic-2"), |
| ) |
| self.assertEqual(network_interface.state, NodeState.PENDING) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-19T08:18:50Z") |
| self.assertEqual(extra["created_by"], "test@test.te") |
| self.assertEqual(extra["etag"], "8679142b0b1b70c8b8c09a8b31e6ded9") |
| self.assertEqual(extra["last_modified_date"], "2016-10-19T08:18:50Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.te") |
| self.assertEqual(extra["state"], "BUSY") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["name"], "libcloud Test") |
| self.assertEqual(extra["mac"], None) |
| self.assertEqual(extra["ips"], ["10.0.0.1"]) |
| self.assertEqual(extra["dhcp"], True) |
| self.assertEqual(extra["lan"], 1) |
| self.assertEqual(extra["firewall_active"], None) |
| self.assertEqual(extra["nat"], None) |
| |
| def test_ex_create_network_interface_failure(self): |
| with self.assertRaises(AttributeError): |
| "Raises attribute error if no node" |
| self.driver.ex_create_network_interface( |
| node=None, lan_id=1, nic_name="libcloud Test" |
| ) |
| |
| def test_ex_update_network_interface(self): |
| |
| network_interface = self.driver.ex_describe_network_interface( |
| ("/cloudapi/v4/datacenters/" "dc-1/" "servers/s-3" "/nics/nic-2") |
| ) |
| updated = self.driver.ex_update_network_interface( |
| network_interface=network_interface, name="libcloud Test - RENAME" |
| ) |
| |
| extra = updated.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(updated.id, "nic-2") |
| self.assertEqual(updated.name, "libcloud Test - RENAME") |
| self.assertEqual( |
| updated.href, |
| ("/cloudapi/v4/datacenters/" "dc-1/" "servers/s-3/" "nics/nic-2"), |
| ) |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["name"], "libcloud Test - RENAME") |
| |
| def test_ex_destroy_network_interface(self): |
| |
| network_interface = self.driver.ex_describe_network_interface( |
| ("/cloudapi/v4/datacenters/" "dc-1/" "servers/s-3" "/nics/nic-2") |
| ) |
| destroyed = self.driver.ex_destroy_network_interface( |
| network_interface=network_interface |
| ) |
| |
| self.assertTrue(destroyed) |
| |
| def test_ex_set_inet_access(self): |
| network_interface = self.driver.ex_describe_network_interface( |
| ("/cloudapi/v4/datacenters/" "dc-1/" "servers/s-3" "/nics/nic-2") |
| ) |
| updated = self.driver.ex_set_inet_access( |
| network_interface=network_interface, internet_access=False |
| ) |
| |
| self.assertTrue(updated) |
| |
| return {} |
| |
| """ |
| Function tests for operations on locations |
| """ |
| |
| def test_ex_describe_location(self): |
| location_w_href = self.driver.ex_describe_location( |
| ex_href=("/cloudapi/v4/locations/us/las") |
| ) |
| location_w_id = self.driver.ex_describe_location(ex_location_id="us/las") |
| self._verify_location(location=location_w_href) |
| self._verify_location(location=location_w_id) |
| |
| def test_ex_describe_location_failure(self): |
| with self.assertRaises(BaseHTTPError) as cm: |
| self.driver.ex_describe_location(ex_location_id="us/000") |
| self.assertIn("Resource does not exist", cm.exception.message.value) |
| |
| def _verify_location(self, location): |
| self.assertEqual(location.id, "us/las") |
| self.assertEqual(location.name, "lasvegas") |
| self.assertEqual(location.country, "us") |
| |
| """ |
| Function tests for operations on firewall rules |
| """ |
| |
| def test_ex_list_firewall_rules(self): |
| network_interface = self.driver.ex_describe_network_interface( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1/" "servers/s-3/" "nics/nic-2") |
| ) |
| firewall_rules = self.driver.ex_list_firewall_rules(network_interface) |
| self.assertTrue(len(firewall_rules) > 0) |
| |
| firewall_rule = firewall_rules[0] |
| extra = firewall_rule.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(firewall_rule.id, "fwr-1") |
| self.assertEqual(firewall_rule.name, "Test updated firewall rule") |
| self.assertEqual( |
| firewall_rule.href, |
| ( |
| "/cloudapi/v4/datacenters/" |
| "dc-1/" |
| "servers/s-3/" |
| "nics/nic-2/" |
| "firewallrules/fwr-1" |
| ), |
| ) |
| self.assertEqual(firewall_rule.state, NodeState.RUNNING) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-19T11:08:10Z") |
| self.assertEqual(extra["created_by"], "test@test.te") |
| self.assertEqual(extra["etag"], "b91a2e082a7422dafb79d84a07fb2a28") |
| self.assertEqual(extra["last_modified_date"], "2016-10-19T11:19:04Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.te") |
| self.assertEqual(extra["state"], "AVAILABLE") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["name"], "Test updated firewall rule") |
| self.assertEqual(extra["protocol"], "TCP") |
| self.assertEqual(extra["source_mac"], None) |
| self.assertEqual(extra["source_ip"], None) |
| self.assertEqual(extra["target_ip"], None) |
| |
| self.assertEqual(extra["icmp_code"], None) |
| self.assertEqual(extra["icmp_type"], None) |
| self.assertEqual(extra["port_range_start"], 80) |
| self.assertEqual(extra["port_range_end"], 80) |
| |
| def test_ex_describe_firewall_rule(self): |
| firewall_rule_w_href = self.driver.ex_describe_firewall_rule( |
| ex_href=( |
| "/cloudapi/v4/datacenters/" |
| "dc-1/servers/" |
| "s-3/nics/" |
| "nic-2/firewallrules" |
| "/fw2" |
| ) |
| ) |
| firewall_rule_w_id = self.driver.ex_describe_firewall_rule( |
| ex_datacenter_id="dc-1", |
| ex_server_id="s-3", |
| ex_nic_id="nic-2", |
| ex_firewall_rule_id="fw2", |
| ) |
| self._verify_firewall_rule(firewall_rule=firewall_rule_w_href) |
| self._verify_firewall_rule(firewall_rule=firewall_rule_w_id) |
| |
| def test_ex_describe_firewall_rule_failure(self): |
| with self.assertRaises(BaseHTTPError) as cm: |
| self.driver.ex_describe_firewall_rule( |
| ex_datacenter_id="dc-1", |
| ex_server_id="s-3", |
| ex_nic_id="nic-2", |
| ex_firewall_rule_id="00", |
| ) |
| self.assertIn("Resource does not exist", cm.exception.message.value) |
| |
| def _verify_firewall_rule(self, firewall_rule): |
| extra = firewall_rule.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(firewall_rule.id, "fw2") |
| self.assertEqual(firewall_rule.name, "SSH") |
| self.assertEqual( |
| firewall_rule.href, |
| ( |
| "/cloudapi/v4/datacenters/" |
| "dc-1/servers/" |
| "s-3/nics/" |
| "nic-2/" |
| "firewallrules/fw2" |
| ), |
| ) |
| self.assertEqual(firewall_rule.state, NodeState.RUNNING) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-19T09:55:10Z") |
| self.assertEqual(extra["created_by"], "test@test.te") |
| self.assertEqual(extra["etag"], "00bb5b86562db1ed19ca38697e485160") |
| self.assertEqual(extra["last_modified_date"], "2016-10-19T09:55:10Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.te") |
| self.assertEqual(extra["state"], "AVAILABLE") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["name"], "SSH") |
| self.assertEqual(extra["protocol"], "TCP") |
| self.assertEqual(extra["source_mac"], "01:23:45:67:89:00") |
| self.assertEqual(extra["source_ip"], None) |
| self.assertEqual(extra["target_ip"], None) |
| |
| self.assertEqual(extra["icmp_code"], None) |
| self.assertEqual(extra["icmp_type"], None) |
| self.assertEqual(extra["port_range_start"], 22) |
| self.assertEqual(extra["port_range_end"], 22) |
| |
| def test_ex_create_firewall_rule(self): |
| |
| network_interface = self.driver.ex_describe_network_interface( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1/" "servers/s-3/" "nics/nic-2") |
| ) |
| |
| firewall_rule = self.driver.ex_create_firewall_rule( |
| network_interface=network_interface, |
| protocol="TCP", |
| name="SSH", |
| port_range_start=22, |
| port_range_end=22, |
| ) |
| |
| extra = firewall_rule.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(firewall_rule.id, "fwr-1") |
| self.assertEqual(firewall_rule.name, "SSH") |
| self.assertEqual( |
| firewall_rule.href, |
| ( |
| "/cloudapi/v4/datacenters/" |
| "dc-1/" |
| "servers/s-3/" |
| "nics/nic-2/" |
| "firewallrules/fwr-1" |
| ), |
| ) |
| self.assertEqual(firewall_rule.state, NodeState.PENDING) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-19T11:08:04Z") |
| self.assertEqual(extra["created_by"], "test@test.te") |
| self.assertEqual(extra["etag"], "2a21551ba4adf85d9fb04b05a6938bcc") |
| self.assertEqual(extra["last_modified_date"], "2016-10-19T11:08:04Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.te") |
| self.assertEqual(extra["state"], "BUSY") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["name"], "SSH") |
| self.assertEqual(extra["protocol"], "TCP") |
| self.assertEqual(extra["source_mac"], "01:23:45:67:89:00") |
| self.assertEqual(extra["source_ip"], None) |
| self.assertEqual(extra["target_ip"], None) |
| |
| self.assertEqual(extra["icmp_code"], None) |
| self.assertEqual(extra["icmp_type"], None) |
| self.assertEqual(extra["port_range_start"], 22) |
| self.assertEqual(extra["port_range_end"], 22) |
| |
| def test_ex_create_firewall_rule_failure(self): |
| with self.assertRaises(AttributeError): |
| "Raises attribute error if no network interface" |
| self.driver.ex_create_firewall_rule( |
| network_interface=None, protocol="TCP", name="SSH" |
| ) |
| |
| def test_ex_update_firewall_rule(self): |
| |
| firewall_rule = self.driver.ex_describe_firewall_rule( |
| ex_href=( |
| "/cloudapi/v4/datacenters/" |
| "dc-1/" |
| "servers/s-3/" |
| "nics/nic-2/" |
| "firewallrules/fw2" |
| ) |
| ) |
| updated = self.driver.ex_update_firewall_rule( |
| firewall_rule=firewall_rule, name="SSH - RENAME" |
| ) |
| |
| extra = updated.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(updated.id, "fw2") |
| self.assertEqual(updated.name, "SSH - RENAME") |
| |
| self.assertEqual(extra["name"], "SSH - RENAME") |
| |
| def test_ex_delete_firewall_rule(self): |
| |
| firewall_rule = self.driver.ex_describe_firewall_rule( |
| ex_href=( |
| "/cloudapi/v4/datacenters/" |
| "dc-1/" |
| "servers/s-3/" |
| "nics/nic-2/" |
| "firewallrules/fw2" |
| ) |
| ) |
| deleted = self.driver.ex_delete_firewall_rule(firewall_rule) |
| |
| self.assertTrue(deleted) |
| |
| """ |
| Function tests for operations on lans |
| """ |
| |
| def test_ex_list_lans(self): |
| datacenter = self.driver.ex_describe_datacenter( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1") |
| ) |
| lans = self.driver.ex_list_lans(datacenter=datacenter) |
| lan = lans[0] |
| extra = lan.extra |
| self.assertEqual(len(lans), 1) |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(lan.id, "1") |
| self.assertEqual(lan.href, ("/cloudapi/v4/datacenters/" "dc-1/lans/1")) |
| self.assertEqual(lan.name, "libcloud Test") |
| self.assertEqual(lan.is_public, False) |
| self.assertEqual(lan.state, NodeState.RUNNING) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-24T08:03:22Z") |
| self.assertEqual(extra["created_by"], "test@test.te") |
| self.assertEqual(extra["last_modified_date"], "2016-10-24T08:03:22Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.te") |
| self.assertEqual(extra["state"], "AVAILABLE") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["name"], "libcloud Test") |
| self.assertEqual(extra["is_public"], False) |
| |
| """ |
| Miscellaneous |
| """ |
| self.assertEqual(len(extra["entities"]), 1) |
| |
| def test_ex_create_lan(self): |
| datacenter = self.driver.ex_describe_datacenter( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1") |
| ) |
| lan = self.driver.ex_create_lan(datacenter=datacenter, is_public=True) |
| extra = lan.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(lan.id, "10") |
| self.assertEqual(lan.href, ("/cloudapi/v4/datacenters/" "dc-1/lans/10")) |
| self.assertEqual(lan.name, "libcloud Test") |
| self.assertEqual(lan.is_public, True) |
| self.assertEqual(lan.state, NodeState.PENDING) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-17T11:33:11Z") |
| self.assertEqual(extra["created_by"], "test@test.te") |
| self.assertEqual(extra["etag"], "53b215b8ec0356a649955dab019845a4") |
| self.assertEqual(extra["last_modified_date"], "2016-10-18T15:13:44Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.te") |
| self.assertEqual(extra["state"], "BUSY") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["name"], "libcloud Test") |
| self.assertEqual(extra["is_public"], True) |
| |
| def test_ex_create_lan_failure(self): |
| with self.assertRaises(AttributeError): |
| "Raises attribute error if no datacenter" |
| self.driver.ex_create_lan(datacenter=None) |
| |
| def test_ex_describe_lan(self): |
| lan_w_href = self.driver.ex_describe_lan( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1/lans/10") |
| ) |
| lan_w_id = self.driver.ex_describe_lan(ex_datacenter_id="dc-1", ex_lan_id="10") |
| self._verify_lan(lan=lan_w_href) |
| self._verify_lan(lan=lan_w_id) |
| |
| def test_ex_describe_lan_failure(self): |
| with self.assertRaises(BaseHTTPError) as cm: |
| self.driver.ex_describe_lan(ex_datacenter_id="dc-1", ex_lan_id="0") |
| self.assertIn("Resource does not exist", cm.exception.message.value) |
| |
| def _verify_lan(self, lan): |
| extra = lan.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(lan.id, "10") |
| self.assertEqual(lan.href, ("/cloudapi/v4/datacenters/" "dc-1/lans/10")) |
| self.assertEqual(lan.name, "libcloud Test") |
| self.assertEqual(lan.is_public, True) |
| self.assertEqual(lan.state, NodeState.PENDING) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-17T11:33:11Z") |
| self.assertEqual(extra["created_by"], "test@test.te") |
| self.assertEqual(extra["etag"], "53b215b8ec0356a649955dab019845a4") |
| self.assertEqual(extra["last_modified_date"], "2016-10-18T15:13:44Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.te") |
| self.assertEqual(extra["state"], "BUSY") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["name"], "libcloud Test") |
| self.assertEqual(extra["is_public"], True) |
| |
| def test_ex_update_lan(self): |
| lan = self.driver.ex_describe_lan( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1/lans/10") |
| ) |
| updated = self.driver.ex_update_lan( |
| lan=lan, is_public=False, name="libcloud Test - RENAME" |
| ) |
| extra = updated.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(updated.id, "10") |
| self.assertEqual(updated.href, ("/cloudapi/v4/datacenters/" "dc-1/lans/10")) |
| self.assertEqual(updated.name, "libcloud Test - RENAME") |
| self.assertEqual(updated.is_public, False) |
| self.assertEqual(updated.state, NodeState.PENDING) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-17T11:33:11Z") |
| self.assertEqual(extra["created_by"], "test@test.te") |
| self.assertEqual(extra["etag"], "53b215b8ec0356a649955dab019845a4") |
| self.assertEqual(extra["last_modified_date"], "2016-10-18T15:13:44Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.te") |
| self.assertEqual(extra["state"], "BUSY") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["name"], "libcloud Test - RENAME") |
| self.assertEqual(extra["is_public"], False) |
| |
| def test_ex_delete_lan(self): |
| lan = self.driver.ex_describe_lan( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1/lans/10") |
| ) |
| deleted = self.driver.ex_delete_lan(lan) |
| self.assertTrue(deleted) |
| |
| """ |
| Function tests for operations on load balancers |
| """ |
| |
| def test_ex_list_load_balancers(self): |
| load_balancers = self.driver.ex_list_load_balancers() |
| self.assertTrue(len(load_balancers) > 0) |
| |
| def test_ex_describe_load_balancer(self): |
| load_balancer_w_href = self.driver.ex_describe_load_balancer( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-2/" "loadbalancers/bal-1") |
| ) |
| |
| load_balancer_w_id = self.driver.ex_describe_load_balancer( |
| ex_datacenter_id="dc-2", ex_load_balancer_id="bal-1" |
| ) |
| self._verify_load_balancer(load_balancer=load_balancer_w_href) |
| self._verify_load_balancer(load_balancer=load_balancer_w_id) |
| |
| def test_ex_describe_load_balancer_failure(self): |
| with self.assertRaises(BaseHTTPError) as cm: |
| self.driver.ex_describe_load_balancer( |
| ex_datacenter_id="dc-2", ex_load_balancer_id="00000000" |
| ) |
| self.assertIn("Resource does not exist", cm.exception.message.value) |
| |
| def _verify_load_balancer(self, load_balancer): |
| """ |
| Standard properties |
| """ |
| self.assertEqual(load_balancer.id, "bal-1") |
| self.assertEqual( |
| load_balancer.href, |
| ("/cloudapi/v4/datacenters/" "dc-2/" "loadbalancers/bal-1"), |
| ) |
| self.assertEqual(load_balancer.name, "libcloud Test") |
| self.assertEqual(load_balancer.state, NodeState.RUNNING) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(load_balancer.extra["created_date"], "2016-10-26T13:02:33Z") |
| self.assertEqual(load_balancer.extra["created_by"], "test@test.te") |
| self.assertEqual( |
| load_balancer.extra["etag"], "71e8df57a58615b9e15400ede4138b41" |
| ) |
| self.assertEqual( |
| load_balancer.extra["last_modified_date"], "2016-10-26T13:02:33Z" |
| ) |
| self.assertEqual(load_balancer.extra["last_modified_by"], "test@test.te") |
| self.assertEqual(load_balancer.extra["state"], "AVAILABLE") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(load_balancer.extra["name"], "libcloud Test") |
| self.assertIsNotNone(load_balancer.extra["ip"]) |
| self.assertEqual(load_balancer.extra["dhcp"], True) |
| |
| def test_ex_create_load_balancer(self): |
| datacenter = self.driver.ex_describe_datacenter( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1") |
| ) |
| |
| nics = self.driver.ex_list_network_interfaces() |
| |
| created = self.driver.ex_create_load_balancer( |
| datacenter=datacenter, name="libcloud Test", dhcp=True, nics=[nics[0]] |
| ) |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(created.id, "bal-1") |
| self.assertEqual( |
| created.href, ("/cloudapi/v4/datacenters" "/dc-1" "/loadbalancers/bal-1") |
| ) |
| self.assertEqual(created.name, "libcloud Test") |
| self.assertEqual(created.state, NodeState.PENDING) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(created.extra["created_date"], "2016-10-26T13:02:33Z") |
| self.assertEqual(created.extra["created_by"], "test@test.te") |
| self.assertEqual(created.extra["etag"], "71e8df57a58615b9e15400ede4138b41") |
| self.assertEqual(created.extra["last_modified_date"], "2016-10-26T13:02:33Z") |
| self.assertEqual(created.extra["last_modified_by"], "test@test.te") |
| self.assertEqual(created.extra["state"], "BUSY") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(created.extra["name"], "libcloud Test") |
| self.assertEqual(created.extra["ip"], None) |
| self.assertEqual(created.extra["dhcp"], True) |
| self.assertIsNotNone(created.extra["entities"]["balancednics"]) |
| |
| def test_ex_create_load_balancer_failure(self): |
| with self.assertRaises(AttributeError): |
| "Raises attribute error if no datacenter" |
| self.driver.ex_create_load_balancer(datacenter=None, name="libcloud Test") |
| |
| def test_ex_update_load_balancer(self): |
| load_balancer = self.driver.ex_describe_load_balancer( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-2/" "loadbalancers/bal-1") |
| ) |
| |
| updated = self.driver.ex_update_load_balancer( |
| load_balancer=load_balancer, name="libcloud Test - RENAME" |
| ) |
| |
| self.assertEqual(updated.name, "libcloud Test - RENAME") |
| |
| def test_ex_list_load_balanced_nics(self): |
| load_balancer = self.driver.ex_describe_load_balancer( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-2/" "loadbalancers/bal-1") |
| ) |
| |
| network_interfaces = self.driver.ex_list_load_balanced_nics(load_balancer) |
| |
| self.assertTrue(len(network_interfaces) > 0) |
| |
| network_interface = network_interfaces[0] |
| extra = network_interface.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(network_interface.id, "nic-1") |
| self.assertEqual(network_interface.name, "libcloud Test") |
| self.assertEqual( |
| network_interface.href, |
| ("/cloudapi/v4/datacenters/" "dc-1/" "servers/s-3/" "nics/nic-1"), |
| ) |
| self.assertEqual(network_interface.state, NodeState.RUNNING) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-17T15:46:38Z") |
| self.assertEqual(extra["created_by"], "test@test.te") |
| self.assertEqual(extra["etag"], "dbd8216137cf0ec9951170f93fa8fa53") |
| self.assertEqual(extra["last_modified_date"], "2016-10-17T18:19:43Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.te") |
| self.assertEqual(extra["state"], "AVAILABLE") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["name"], "libcloud Test") |
| self.assertEqual(extra["mac"], "02:01:0b:9d:4d:ce") |
| self.assertEqual(extra["ips"], ["10.15.124.11"]) |
| self.assertEqual(extra["dhcp"], False) |
| self.assertEqual(extra["lan"], 2) |
| self.assertEqual(extra["firewall_active"], True) |
| self.assertEqual(extra["nat"], False) |
| |
| def test_ex_describe_load_balanced_nic(self): |
| network_interface_w_href = self.driver.ex_describe_network_interface( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1/" "servers/s-3/" "nics/nic-2") |
| ) |
| network_interface_w_id = self.driver.ex_describe_network_interface( |
| ex_datacenter_id="dc-1", ex_server_id="s-3", ex_nic_id="nic-2" |
| ) |
| self._verify_load_balanced_nic(network_interface=network_interface_w_href) |
| self._verify_load_balanced_nic(network_interface=network_interface_w_id) |
| |
| def _verify_load_balanced_nic(self, network_interface): |
| extra = network_interface.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(network_interface.id, "nic-2") |
| self.assertEqual(network_interface.name, "libcloud Test") |
| self.assertEqual( |
| network_interface.href, |
| ("/cloudapi/v4/datacenters/" "dc-1/" "servers/s-3" "/nics/nic-2"), |
| ) |
| self.assertEqual(network_interface.state, NodeState.RUNNING) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-17T15:46:38Z") |
| self.assertEqual(extra["created_by"], "test@test.te") |
| self.assertEqual(extra["etag"], "dbd8216137cf0ec9951170f93fa8fa53") |
| self.assertEqual(extra["last_modified_date"], "2016-10-17T18:19:43Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.te") |
| self.assertEqual(extra["state"], "AVAILABLE") |
| |
| """ |
| Extra properties |
| """ |
| self.assertEqual(extra["name"], "libcloud Test") |
| assertRegex(self, extra["mac"], "^([0-9a-f]{2}[:]){5}([0-9a-f]{2})$") |
| self.assertTrue(len(extra["ips"]) > 0) |
| self.assertEqual(extra["dhcp"], True) |
| self.assertIsInstance(extra["lan"], int) |
| self.assertEqual(extra["firewall_active"], True) |
| self.assertEqual(extra["nat"], False) |
| |
| def test_ex_attach_nic_to_load_balancer(self): |
| network_interface = self.driver.ex_describe_network_interface( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-1/" "servers/s-3" "/nics/nic-2") |
| ) |
| load_balancer = self.driver.ex_describe_load_balancer( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-2/" "loadbalancers/bal-1") |
| ) |
| attached = self.driver.ex_attach_nic_to_load_balancer( |
| load_balancer=load_balancer, network_interface=network_interface |
| ) |
| self.assertTrue(attached) |
| |
| def test_ex_remove_nic_from_load_balancer(self): |
| network_interface = self.driver.ex_describe_network_interface( |
| ex_href=(("/cloudapi/v4/datacenters/" "dc-1/" "servers/s-3/" "nics/nic-2")) |
| ) |
| load_balancer = self.driver.ex_describe_load_balancer( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-2/" "loadbalancers/bal-1") |
| ) |
| detached = self.driver.ex_remove_nic_from_load_balancer( |
| load_balancer=load_balancer, network_interface=network_interface |
| ) |
| self.assertTrue(detached) |
| |
| def test_ex_delete_load_balancer(self): |
| load_balancer = self.driver.ex_describe_load_balancer( |
| ex_href=("/cloudapi/v4/datacenters/" "dc-2/" "loadbalancers/bal-1") |
| ) |
| deleted = self.driver.ex_delete_load_balancer(load_balancer) |
| self.assertTrue(deleted) |
| |
| """ |
| Function tests for operations on IP blocks |
| """ |
| |
| def test_ex_list_ip_blocks(self): |
| ip_blocks = self.driver.ex_list_ip_blocks() |
| self.assertTrue(len(ip_blocks) > 0) |
| |
| def test_ex_create_ip_block(self): |
| location = self.driver.ex_describe_location(ex_location_id="us/las") |
| created = self.driver.ex_create_ip_block( |
| location=location, size=2, name="libcloud Test" |
| ) |
| extra = created.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(created.id, "ipb-1") |
| self.assertEqual(created.name, "libcloud Test") |
| self.assertEqual(created.href, "/cloudapi/v4/ipblocks/ipb-1") |
| self.assertEqual(created.location, "us/las") |
| self.assertEqual(created.size, 2) |
| self.assertEqual(len(created.ips), 2) |
| self.assertEqual(created.state, NodeState.PENDING) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-26T15:05:36Z") |
| self.assertEqual(extra["created_by"], "test@test.te") |
| self.assertEqual(extra["etag"], "acbf00bacf7ee48d4b8bc4e7413e1f30") |
| self.assertEqual(extra["last_modified_date"], "2016-10-26T15:05:36Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.te") |
| self.assertEqual(extra["state"], "BUSY") |
| |
| def test_ex_create_ip_block_failure(self): |
| with self.assertRaises(AttributeError): |
| "Raises attribute error if no location" |
| self.driver.ex_create_ip_block(location=None, size=2, name="libcloud Test") |
| |
| def test_ex_describe_ip_block(self): |
| ip_block_w_href = self.driver.ex_describe_ip_block( |
| ex_href=("/cloudapi/v4/ipblocks/" "ipb-2") |
| ) |
| ip_block_w_id = self.driver.ex_describe_ip_block(ex_ip_block_id="ipb-2") |
| self._verify_ip_block(ip_block=ip_block_w_href) |
| self._verify_ip_block(ip_block=ip_block_w_id) |
| |
| def test_ex_describe_ip_block_failure(self): |
| with self.assertRaises(BaseHTTPError) as cm: |
| self.driver.ex_describe_ip_block(ex_ip_block_id="00000000") |
| self.assertIn("Resource does not exist", cm.exception.message.value) |
| |
| def _verify_ip_block(self, ip_block): |
| extra = ip_block.extra |
| |
| """ |
| Standard properties |
| """ |
| self.assertEqual(ip_block.id, "ipb-2") |
| self.assertEqual(ip_block.name, "libcloud Test") |
| self.assertEqual(ip_block.href, ("/cloudapi/v4/ipblocks/ipb-2")) |
| self.assertEqual(ip_block.location, "us/las") |
| self.assertEqual(ip_block.size, 2) |
| self.assertEqual(len(ip_block.ips), 2) |
| self.assertEqual(ip_block.state, NodeState.RUNNING) |
| |
| """ |
| Extra metadata |
| """ |
| self.assertEqual(extra["created_date"], "2016-10-26T15:05:12Z") |
| self.assertEqual(extra["created_by"], "test@test.te") |
| self.assertEqual(extra["etag"], "43e05b766899950bc8a5aeee0fd89b05") |
| self.assertEqual(extra["last_modified_date"], "2016-10-26T15:05:12Z") |
| self.assertEqual(extra["last_modified_by"], "test@test.te") |
| self.assertEqual(extra["state"], "AVAILABLE") |
| |
| def test_ex_delete_ip_block(self): |
| ip_block = self.driver.ex_describe_ip_block( |
| ex_href=("/cloudapi/v4/ipblocks/" "ipb-2") |
| ) |
| deleted = self.driver.ex_delete_ip_block(ip_block) |
| self.assertTrue(deleted) |
| |
| |
| class ProfitBricksMockHttp(MockHttp): |
| |
| fixtures = ComputeFileFixtures("profitbricks") |
| |
| """ |
| Operations on images |
| |
| GET - fetches images |
| """ |
| |
| def _cloudapi_v4_images(self, method, url, body, headers): |
| body = self.fixtures.load("list_images.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| """ |
| Operations on locations |
| |
| GET - fetches locations |
| """ |
| |
| def _cloudapi_v4_locations(self, method, url, body, headers): |
| body = self.fixtures.load("list_locations.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| """ |
| Operations on a data centers |
| |
| GET - fetches data centers |
| PATCH - creates a data center |
| """ |
| |
| def _cloudapi_v4_datacenters(self, method, url, body, headers): |
| if method == "GET": |
| body = self.fixtures.load("ex_list_datacenters.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| elif method == "POST": |
| body = self.fixtures.load("ex_create_datacenter.json") |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| """ |
| Operations on a data center |
| |
| GET - fetches a data center |
| DELETE - destroys a data center |
| PATCH - updates a data center |
| """ |
| |
| def _cloudapi_v4_datacenters_dc_1(self, method, url, body, headers): |
| if method == "GET": |
| body = self.fixtures.load("ex_describe_datacenter.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| elif method == "DELETE": |
| return (httplib.ACCEPTED, "", {}, httplib.responses[httplib.ACCEPTED]) |
| |
| elif method == "PATCH": |
| body = self.fixtures.load("ex_rename_datacenter.json") |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| def _cloudapi_v4_datacenters_00000000(self, method, url, body, headers): |
| return self._get_not_found() |
| |
| """ |
| Operations on data center nodes (servers) |
| |
| GET - fetches a list of nodes (servers) for a data center |
| POST - creates a node (server) for a data center |
| """ |
| |
| def _cloudapi_v4_datacenters_dc_1_servers(self, method, url, body, headers): |
| if method == "GET": |
| body = self.fixtures.load("list_nodes.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| elif method == "POST": |
| body = self.fixtures.load("create_node.json") |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| """ |
| Operations on data center volumes |
| |
| GET - fetches a list of volumes for a data center |
| POST - creates a volume for a data center |
| """ |
| |
| def _cloudapi_v4_datacenters_dc_1_volumes(self, method, url, body, headers): |
| if method == "GET": |
| body = self.fixtures.load("list_volumes.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| elif method == "POST": |
| body = self.fixtures.load("create_volume.json") |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| """ |
| Operations on a node (server) |
| |
| GET - fetches a node (server) |
| DELETE - destroys a node (server) |
| PATCH - updates a node |
| """ |
| |
| def _cloudapi_v4_datacenters_dc_1_servers_srv_1(self, method, url, body, headers): |
| if method == "GET": |
| body = self.fixtures.load("ex_describe_node.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| elif method == "PATCH": |
| body = self.fixtures.load("ex_update_node.json") |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| elif method == "DELETE": |
| return (httplib.ACCEPTED, "", {}, httplib.responses[httplib.ACCEPTED]) |
| |
| def _cloudapi_v4_datacenters_dc_1_servers_00000000( |
| self, method, url, body, headers |
| ): |
| return self._get_not_found() |
| |
| """ |
| Operations on a node (server) |
| |
| POST - reboots, then starts and stops a node |
| """ |
| "reboot a node" |
| |
| def _cloudapi_v4_datacenters_dc_1_servers_srv_1_reboot( |
| self, method, url, body, headers |
| ): |
| return (httplib.ACCEPTED, "", {}, httplib.responses[httplib.ACCEPTED]) |
| |
| "start a node" |
| |
| def _cloudapi_v4_datacenters_dc_1_servers_srv_1_stop( |
| self, method, url, body, headers |
| ): |
| return (httplib.ACCEPTED, "", {}, httplib.responses[httplib.ACCEPTED]) |
| |
| "stop a node" |
| |
| def _cloudapi_v4_datacenters_dc_1_servers_srv_1_start( |
| self, method, url, body, headers |
| ): |
| return (httplib.ACCEPTED, "", {}, httplib.responses[httplib.ACCEPTED]) |
| |
| """ |
| Operations on an image |
| |
| GET - fetches an image |
| DELETE - deletes an image |
| PATCH - updates an image |
| """ |
| |
| def _cloudapi_v4_images_img_2(self, method, url, body, headers): |
| if method == "GET": |
| body = self.fixtures.load("ex_describe_image.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| elif method == "DELETE": |
| return (httplib.ACCEPTED, "", {}, httplib.responses[httplib.ACCEPTED]) |
| |
| elif method == "PATCH": |
| body = self.fixtures.load("ex_update_image.json") |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| def _cloudapi_v4_images_00000000(self, method, url, body, headers): |
| return self._get_not_found() |
| |
| """ |
| Operations on a volume |
| |
| GET - fetches a volume |
| DELETE - destroys a volume |
| PATCH - updates a volume |
| """ |
| |
| def _cloudapi_v4_datacenters_dc_1_volumes_vol_2(self, method, url, body, headers): |
| if method == "GET": |
| body = self.fixtures.load("ex_describe_volume.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| elif method == "DELETE": |
| return (httplib.ACCEPTED, "", {}, httplib.responses[httplib.ACCEPTED]) |
| elif method == "PATCH": |
| body = self.fixtures.load("ex_update_volume.json") |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| def _cloudapi_v4_datacenters_dc_1_volumes_00000000( |
| self, method, url, body, headers |
| ): |
| return self._get_not_found() |
| |
| """ |
| Operations on a volume connected to a node (server) |
| |
| DELETE - destroys the link between a volume |
| and a server but does delete the volume. |
| """ |
| |
| def _cloudapi_v4_datacenters_dc_1_servers_srv_1_volumes_vol_2( |
| self, method, url, body, headers |
| ): |
| return (httplib.ACCEPTED, "", {}, httplib.responses[httplib.ACCEPTED]) |
| |
| """ |
| Operations on a location |
| |
| GET - fetches a location |
| """ |
| |
| def _cloudapi_v4_locations_us_las(self, method, url, body, headers): |
| body = self.fixtures.load("ex_describe_location.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _cloudapi_v4_locations_us_000(self, method, url, body, headers): |
| return self._get_not_found() |
| |
| """ |
| Operations on volumes connected to nodes (servers) |
| |
| GET - fetch volumes connected to a server |
| POST - attach a volume to a node (server) |
| """ |
| |
| def _cloudapi_v4_datacenters_dc_1_servers_srv_1_volumes( |
| self, method, url, body, headers |
| ): |
| if method == "GET": |
| body = self.fixtures.load("ex_list_attached_volumes.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| elif method == "POST": |
| body = self.fixtures.load("attach_volume.json") |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| """ |
| Operations on network interfaces connected to a server |
| |
| GET - fetch network interfaces for a node (server) |
| POST - create a network interface for a node (server) |
| """ |
| |
| def _cloudapi_v4_datacenters_dc_1_servers_srv_1_nics( |
| self, method, url, body, headers |
| ): |
| if method == "GET": |
| body = self.fixtures.load("ex_list_network_interfaces.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| elif method == "POST": |
| body = self.fixtures.load("ex_create_network_interface.json") |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| """ |
| Operations on network interfaces |
| |
| GET - fetch a network interface |
| DELETE - destroy a network interface |
| PATCH - update a network interface |
| """ |
| |
| def _cloudapi_v4_datacenters_dc_1_servers_s_3_nics_nic_2( |
| self, method, url, body, headers |
| ): |
| if method == "GET": |
| body = self.fixtures.load("ex_describe_network_interface.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| elif method == "DELETE": |
| return (httplib.ACCEPTED, "", {}, httplib.responses[httplib.ACCEPTED]) |
| |
| elif method == "PATCH": |
| body = self.fixtures.load("ex_update_network_interface.json") |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| def _cloudapi_v4_datacenters_dc_1_servers_s_3_nics_00000000( |
| self, method, url, body, headers |
| ): |
| return self._get_not_found() |
| |
| """ |
| Operations on firewall rules |
| |
| GET - fetch a firewall rule |
| DELETE - destroy a firewall rule |
| PATCH - update a firewall rule |
| """ |
| |
| def _cloudapi_v4_datacenters_dc_1_servers_s_3_nics_nic_2_firewallrules_fw2( |
| self, method, url, body, headers |
| ): |
| if method == "GET": |
| body = self.fixtures.load("ex_describe_firewall_rule.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| elif method == "DELETE": |
| return (httplib.ACCEPTED, "", {}, httplib.responses[httplib.ACCEPTED]) |
| |
| elif method == "PATCH": |
| body = self.fixtures.load("ex_update_firewall_rule.json") |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| def _cloudapi_v4_datacenters_dc_1_servers_s_3_nics_nic_2_firewallrules_00( |
| self, method, url, body, headers |
| ): |
| return self._get_not_found() |
| |
| """ |
| Operations on firewall rules connected to a network interface |
| |
| GET - fetch a list of firewall rules connected to a network interface |
| POST - create a firewall rule for a network interface |
| """ |
| |
| def _cloudapi_v4_datacenters_dc_1_servers_s_3_nics_nic_2_firewallrules( |
| self, method, url, body, headers |
| ): |
| if method == "GET": |
| body = self.fixtures.load("ex_list_firewall_rules.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| elif method == "POST": |
| body = self.fixtures.load("ex_create_firewall_rule.json") |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| """ |
| Operations on lans |
| |
| GET - fetch a list of lans |
| POST - create a lan |
| """ |
| |
| def _cloudapi_v4_datacenters_dc_1_lans(self, method, url, body, headers): |
| if method == "GET": |
| body = self.fixtures.load("ex_list_lans.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| elif method == "POST": |
| body = self.fixtures.load("ex_create_lan.json") |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| """ |
| Operations on a single lan |
| |
| GET - fetch a lan |
| DELETE - Destroy a lan |
| PATCH - update a lan |
| """ |
| |
| def _cloudapi_v4_datacenters_dc_1_lans_10(self, method, url, body, headers): |
| if method == "GET": |
| body = self.fixtures.load("ex_describe_lan.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| elif method == "PATCH": |
| body = self.fixtures.load("ex_update_lan.json") |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| elif method == "DELETE": |
| return (httplib.ACCEPTED, "", {}, httplib.responses[httplib.ACCEPTED]) |
| |
| def _cloudapi_v4_datacenters_dc_1_lans_0(self, method, url, body, headers): |
| return self._get_not_found() |
| |
| """ |
| Operations on snapshots |
| |
| GET - fetch a list of snapshots |
| """ |
| |
| def _cloudapi_v4_snapshots(self, method, url, body, headers): |
| if method == "GET": |
| body = self.fixtures.load("list_snapshots.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| """ |
| Operations on volume snapshots |
| |
| POST - create a volume snapshot |
| POST - restore a volume snapshot |
| """ |
| |
| def _cloudapi_v4_datacenters_dc_1_volumes_vol_2_create_snapshot( |
| self, method, url, body, headers |
| ): |
| if method == "POST": |
| body = self.fixtures.load("create_volume_snapshot.json") |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| def _cloudapi_v4_datacenters_dc_1_volumes_vol_2_restore_snapshot( |
| self, method, url, body, headers |
| ): |
| if method == "POST": |
| return (httplib.ACCEPTED, "", {}, httplib.responses[httplib.ACCEPTED]) |
| |
| """ |
| Operations on a single snapshot |
| |
| GET - get information on a snapshot |
| DELETE - delete a snapshot |
| PATCH - update a snapshot |
| """ |
| |
| def _cloudapi_v4_snapshots_sshot(self, method, url, body, headers): |
| if method == "GET": |
| body = self.fixtures.load("ex_describe_snapshot.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| elif method == "PATCH": |
| body = self.fixtures.load("ex_update_snapshot.json") |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| elif method == "DELETE": |
| return (httplib.ACCEPTED, "", {}, httplib.responses[httplib.ACCEPTED]) |
| |
| def _cloudapi_v4_snapshots_00000000(self, method, url, body, headers): |
| return self._get_not_found() |
| |
| """ |
| Operations on load balancers |
| |
| GET - list load balancers |
| POST - create a load balancer for this datacenter |
| """ |
| |
| def _cloudapi_v4_datacenters_dc_1_loadbalancers(self, method, url, body, headers): |
| if method == "GET": |
| body = self.fixtures.load("ex_list_load_balancers.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| elif method == "POST": |
| body = self.fixtures.load("ex_create_load_balancer.json") |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| """ |
| Operations on a single load balancer |
| |
| GET - get information on a load balancer |
| DELETE - delete a load balancer |
| PATCH - update a load balancer |
| """ |
| |
| def _cloudapi_v4_datacenters_dc_2_loadbalancers_bal_1( |
| self, method, url, body, headers |
| ): |
| if method == "GET": |
| body = self.fixtures.load("ex_describe_load_balancer.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| elif method == "DELETE": |
| return (httplib.ACCEPTED, "", {}, httplib.responses[httplib.ACCEPTED]) |
| |
| elif method == "PATCH": |
| body = self.fixtures.load("ex_update_load_balancer.json") |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| def _cloudapi_v4_datacenters_dc_2_loadbalancers_00000000( |
| self, method, url, body, headers |
| ): |
| return self._get_not_found() |
| |
| """ |
| Operations on a load balancers nics |
| |
| GET - get load balanced nics |
| """ |
| |
| def _cloudapi_v4_datacenters_dc_2_loadbalancers_bal_1_balancednics( |
| self, method, url, body, headers |
| ): |
| if method == "GET": |
| body = self.fixtures.load("ex_list_load_balanced_nics.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| elif method == "POST": |
| return (httplib.ACCEPTED, "", {}, httplib.responses[httplib.ACCEPTED]) |
| |
| """ |
| Operations on a load balanced nic |
| |
| DELETE - remove the nic from a load balancer |
| """ |
| |
| def _cloudapi_v4_datacenters_dc_2_loadbalancers_bal_1_balancednics_nic_2( |
| self, method, url, body, headers |
| ): |
| if method == "DELETE": |
| return (httplib.ACCEPTED, "", {}, httplib.responses[httplib.ACCEPTED]) |
| |
| """ |
| Operations on IP blocks |
| |
| GET - list IP blocks |
| POST - create an IP block |
| """ |
| |
| def _cloudapi_v4_ipblocks(self, method, url, body, headers): |
| if method == "GET": |
| body = self.fixtures.load("ex_list_ip_blocks.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| elif method == "POST": |
| body = self.fixtures.load("ex_create_ip_block.json") |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| """ |
| Operations on a single IP block |
| |
| GET - fetch an IP block |
| DELETE - delete an IP block |
| """ |
| |
| def _cloudapi_v4_ipblocks_ipb_2(self, method, url, body, headers): |
| if method == "GET": |
| body = self.fixtures.load("ex_describe_ip_block.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| elif method == "DELETE": |
| return (httplib.ACCEPTED, "", {}, httplib.responses[httplib.ACCEPTED]) |
| |
| def _cloudapi_v4_ipblocks_00000000(self, method, url, body, headers): |
| return self._get_not_found() |
| |
| def _get_not_found(self): |
| body = self.fixtures.load("error_resource_not_found.json") |
| return (httplib.NOT_FOUND, body, {}, httplib.responses[httplib.NOT_FOUND]) |
| |
| |
| if __name__ == "__main__": |
| sys.exit(unittest.main()) |