blob: 0bd2b9759239a5bfd2654e0c5390fa652b2907bc [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from types import GeneratorType
from libcloud.utils.py3 import httplib
from libcloud.utils.py3 import ET
from libcloud.common.types import InvalidCredsError
from libcloud.common.dimensiondata import (
DimensionDataAPIException,
NetworkDomainServicePlan,
)
from libcloud.common.dimensiondata import (
DimensionDataServerCpuSpecification,
DimensionDataServerDisk,
DimensionDataServerVMWareTools,
)
from libcloud.common.dimensiondata import DimensionDataTag, DimensionDataTagKey
from libcloud.common.dimensiondata import (
DimensionDataIpAddress,
DimensionDataIpAddressList,
DimensionDataChildIpAddressList,
DimensionDataPortList,
DimensionDataPort,
DimensionDataChildPortList,
)
from libcloud.common.dimensiondata import TYPES_URN
from libcloud.compute.drivers.dimensiondata import (
DimensionDataNodeDriver as DimensionData,
)
from libcloud.compute.drivers.dimensiondata import DimensionDataNic
from libcloud.compute.base import Node, NodeAuthPassword, NodeLocation
from libcloud.test import MockHttp, unittest
from libcloud.test.file_fixtures import ComputeFileFixtures
from libcloud.test.secrets import DIMENSIONDATA_PARAMS
from libcloud.utils.xml import fixxpath, findtext, findall
class DimensionData_v2_4_Tests(unittest.TestCase):
def setUp(self):
DimensionData.connectionCls.active_api_version = "2.4"
DimensionData.connectionCls.conn_class = DimensionDataMockHttp
DimensionDataMockHttp.type = None
self.driver = DimensionData(*DIMENSIONDATA_PARAMS)
def test_invalid_region(self):
with self.assertRaises(ValueError):
DimensionData(*DIMENSIONDATA_PARAMS, region="blah")
def test_invalid_creds(self):
DimensionDataMockHttp.type = "UNAUTHORIZED"
with self.assertRaises(InvalidCredsError):
self.driver.list_nodes()
def test_get_account_details(self):
DimensionDataMockHttp.type = None
ret = self.driver.connection.get_account_details()
self.assertEqual(ret.full_name, "Test User")
self.assertEqual(ret.first_name, "Test")
self.assertEqual(ret.email, "test@example.com")
def test_list_locations_response(self):
DimensionDataMockHttp.type = None
ret = self.driver.list_locations()
self.assertEqual(len(ret), 5)
first_loc = ret[0]
self.assertEqual(first_loc.id, "NA3")
self.assertEqual(first_loc.name, "US - West")
self.assertEqual(first_loc.country, "US")
def test_list_nodes_response(self):
DimensionDataMockHttp.type = None
ret = self.driver.list_nodes()
self.assertEqual(len(ret), 7)
def test_node_extras(self):
DimensionDataMockHttp.type = None
ret = self.driver.list_nodes()
self.assertTrue(
isinstance(ret[0].extra["vmWareTools"], DimensionDataServerVMWareTools)
)
self.assertTrue(
isinstance(ret[0].extra["cpu"], DimensionDataServerCpuSpecification)
)
self.assertTrue(isinstance(ret[0].extra["disks"], list))
self.assertTrue(isinstance(ret[0].extra["disks"][0], DimensionDataServerDisk))
self.assertEqual(ret[0].extra["disks"][0].size_gb, 10)
self.assertTrue(isinstance(ret[1].extra["disks"], list))
self.assertTrue(isinstance(ret[1].extra["disks"][0], DimensionDataServerDisk))
self.assertEqual(ret[1].extra["disks"][0].size_gb, 10)
def test_server_states(self):
DimensionDataMockHttp.type = None
ret = self.driver.list_nodes()
self.assertTrue(ret[0].state == "running")
self.assertTrue(ret[1].state == "starting")
self.assertTrue(ret[2].state == "stopping")
self.assertTrue(ret[3].state == "reconfiguring")
self.assertTrue(ret[4].state == "running")
self.assertTrue(ret[5].state == "terminated")
self.assertTrue(ret[6].state == "stopped")
self.assertEqual(len(ret), 7)
def test_list_nodes_response_PAGINATED(self):
DimensionDataMockHttp.type = "PAGINATED"
ret = self.driver.list_nodes()
self.assertEqual(len(ret), 9)
def test_paginated_mcp2_call_EMPTY(self):
# cache org
self.driver.connection._get_orgId()
DimensionDataMockHttp.type = "EMPTY"
node_list_generator = self.driver.connection.paginated_request_with_orgId_api_2(
"server/server"
)
empty_node_list = []
for node_list in node_list_generator:
empty_node_list.extend(node_list)
self.assertTrue(len(empty_node_list) == 0)
def test_paginated_mcp2_call_PAGED_THEN_EMPTY(self):
# cache org
self.driver.connection._get_orgId()
DimensionDataMockHttp.type = "PAGED_THEN_EMPTY"
node_list_generator = self.driver.connection.paginated_request_with_orgId_api_2(
"server/server"
)
final_node_list = []
for node_list in node_list_generator:
final_node_list.extend(node_list)
self.assertTrue(len(final_node_list) == 2)
def test_paginated_mcp2_call_with_page_size(self):
# cache org
self.driver.connection._get_orgId()
DimensionDataMockHttp.type = "PAGESIZE50"
node_list_generator = self.driver.connection.paginated_request_with_orgId_api_2(
"server/server", page_size=50
)
self.assertTrue(isinstance(node_list_generator, GeneratorType))
# We're making sure here the filters make it to the URL
# See _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_ALLFILTERS for asserts
def test_list_nodes_response_strings_ALLFILTERS(self):
DimensionDataMockHttp.type = "ALLFILTERS"
ret = self.driver.list_nodes(
ex_location="fake_loc",
ex_name="fake_name",
ex_ipv6="fake_ipv6",
ex_ipv4="fake_ipv4",
ex_vlan="fake_vlan",
ex_image="fake_image",
ex_deployed=True,
ex_started=True,
ex_state="fake_state",
ex_network="fake_network",
ex_network_domain="fake_network_domain",
)
self.assertTrue(isinstance(ret, list))
self.assertEqual(len(ret), 7)
node = ret[3]
self.assertTrue(isinstance(node.extra["disks"], list))
self.assertTrue(isinstance(node.extra["disks"][0], DimensionDataServerDisk))
self.assertEqual(node.size.id, "1")
self.assertEqual(node.image.id, "3ebf3c0f-90fe-4a8b-8585-6e65b316592c")
self.assertEqual(node.image.name, "WIN2008S/32")
disk = node.extra["disks"][0]
self.assertEqual(disk.id, "c2e1f199-116e-4dbc-9960-68720b832b0a")
self.assertEqual(disk.scsi_id, 0)
self.assertEqual(disk.size_gb, 50)
self.assertEqual(disk.speed, "STANDARD")
self.assertEqual(disk.state, "NORMAL")
def test_list_nodes_response_LOCATION(self):
DimensionDataMockHttp.type = None
ret = self.driver.list_locations()
first_loc = ret[0]
ret = self.driver.list_nodes(ex_location=first_loc)
for node in ret:
self.assertEqual(node.extra["datacenterId"], "NA3")
def test_list_nodes_response_LOCATION_STR(self):
DimensionDataMockHttp.type = None
ret = self.driver.list_nodes(ex_location="NA3")
for node in ret:
self.assertEqual(node.extra["datacenterId"], "NA3")
def test_list_sizes_response(self):
DimensionDataMockHttp.type = None
ret = self.driver.list_sizes()
self.assertEqual(len(ret), 1)
size = ret[0]
self.assertEqual(size.name, "default")
def test_reboot_node_response(self):
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
ret = node.reboot()
self.assertTrue(ret is True)
def test_reboot_node_response_INPROGRESS(self):
DimensionDataMockHttp.type = "INPROGRESS"
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
with self.assertRaises(DimensionDataAPIException):
node.reboot()
def test_destroy_node_response(self):
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
ret = node.destroy()
self.assertTrue(ret is True)
def test_destroy_node_response_RESOURCE_BUSY(self):
DimensionDataMockHttp.type = "INPROGRESS"
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
with self.assertRaises(DimensionDataAPIException):
node.destroy()
def test_list_images(self):
images = self.driver.list_images()
self.assertEqual(len(images), 3)
self.assertEqual(images[0].name, "RedHat 6 64-bit 2 CPU")
self.assertEqual(images[0].id, "c14b1a46-2428-44c1-9c1a-b20e6418d08c")
self.assertEqual(images[0].extra["location"].id, "NA9")
self.assertEqual(images[0].extra["cpu"].cpu_count, 2)
self.assertEqual(images[0].extra["OS_displayName"], "REDHAT6/64")
def test_clean_failed_deployment_response_with_node(self):
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
ret = self.driver.ex_clean_failed_deployment(node)
self.assertTrue(ret is True)
def test_clean_failed_deployment_response_with_node_id(self):
node = "e75ead52-692f-4314-8725-c8a4f4d13a87"
ret = self.driver.ex_clean_failed_deployment(node)
self.assertTrue(ret is True)
def test_ex_list_customer_images(self):
images = self.driver.ex_list_customer_images()
self.assertEqual(len(images), 3)
self.assertEqual(images[0].name, "ImportedCustomerImage")
self.assertEqual(images[0].id, "5234e5c7-01de-4411-8b6e-baeb8d91cf5d")
self.assertEqual(images[0].extra["location"].id, "NA9")
self.assertEqual(images[0].extra["cpu"].cpu_count, 4)
self.assertEqual(images[0].extra["OS_displayName"], "REDHAT6/64")
def test_create_mcp1_node_optional_param(self):
root_pw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
network = self.driver.ex_list_networks()[0]
cpu_spec = DimensionDataServerCpuSpecification(
cpu_count="4", cores_per_socket="2", performance="STANDARD"
)
disks = [DimensionDataServerDisk(scsi_id="0", speed="HIGHPERFORMANCE")]
node = self.driver.create_node(
name="test2",
image=image,
auth=root_pw,
ex_description="test2 node",
ex_network=network,
ex_is_started=False,
ex_memory_gb=8,
ex_disks=disks,
ex_cpu_specification=cpu_spec,
ex_primary_dns="10.0.0.5",
ex_secondary_dns="10.0.0.6",
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_mcp1_node_response_no_pass_random_gen(self):
image = self.driver.list_images()[0]
network = self.driver.ex_list_networks()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=None,
ex_description="test2 node",
ex_network=network,
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
self.assertTrue("password" in node.extra)
def test_create_mcp1_node_response_no_pass_customer_windows(self):
image = self.driver.ex_list_customer_images()[1]
network = self.driver.ex_list_networks()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=None,
ex_description="test2 node",
ex_network=network,
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
self.assertTrue("password" in node.extra)
def test_create_mcp1_node_response_no_pass_customer_windows_STR(self):
image = self.driver.ex_list_customer_images()[1].id
network = self.driver.ex_list_networks()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=None,
ex_description="test2 node",
ex_network=network,
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
self.assertTrue("password" in node.extra)
def test_create_mcp1_node_response_no_pass_customer_linux(self):
image = self.driver.ex_list_customer_images()[0]
network = self.driver.ex_list_networks()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=None,
ex_description="test2 node",
ex_network=network,
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
self.assertTrue("password" not in node.extra)
def test_create_mcp1_node_response_no_pass_customer_linux_STR(self):
image = self.driver.ex_list_customer_images()[0].id
network = self.driver.ex_list_networks()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=None,
ex_description="test2 node",
ex_network=network,
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
self.assertTrue("password" not in node.extra)
def test_create_mcp1_node_response_STR(self):
rootPw = "pass123"
image = self.driver.list_images()[0].id
network = self.driver.ex_list_networks()[0].id
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network=network,
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_response_network_domain(self):
rootPw = NodeAuthPassword("pass123")
location = self.driver.ex_get_location_by_id("NA9")
image = self.driver.list_images(location=location)[0]
network_domain = self.driver.ex_list_network_domains(location=location)[0]
vlan = self.driver.ex_list_vlans(location=location)[0]
cpu = DimensionDataServerCpuSpecification(
cpu_count=4, cores_per_socket=1, performance="HIGHPERFORMANCE"
)
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain=network_domain,
ex_vlan=vlan,
ex_is_started=False,
ex_cpu_specification=cpu,
ex_memory_gb=4,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_response_network_domain_STR(self):
rootPw = NodeAuthPassword("pass123")
location = self.driver.ex_get_location_by_id("NA9")
image = self.driver.list_images(location=location)[0]
network_domain = self.driver.ex_list_network_domains(location=location)[0].id
vlan = self.driver.ex_list_vlans(location=location)[0].id
cpu = DimensionDataServerCpuSpecification(
cpu_count=4, cores_per_socket=1, performance="HIGHPERFORMANCE"
)
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain=network_domain,
ex_vlan=vlan,
ex_is_started=False,
ex_cpu_specification=cpu,
ex_memory_gb=4,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_mcp1_node_no_network(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
with self.assertRaises(InvalidRequestError):
self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network=None,
ex_is_started=False,
)
def test_create_node_mcp1_ipv4(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network="fakenetwork",
ex_primary_ipv4="10.0.0.1",
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_mcp1_network(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network="fakenetwork",
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_mcp2_vlan(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_vlan="fakevlan",
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_mcp2_ipv4(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_ipv4="10.0.0.1",
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_network_domain_no_vlan_or_ipv4(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
with self.assertRaises(ValueError):
self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fake_network_domain",
ex_is_started=False,
)
def test_create_node_response(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
node = self.driver.create_node(
name="test3",
image=image,
auth=rootPw,
ex_network_domain="fakenetworkdomain",
ex_primary_nic_vlan="fakevlan",
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_ms_time_zone(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
node = self.driver.create_node(
name="test3",
image=image,
auth=rootPw,
ex_network_domain="fakenetworkdomain",
ex_primary_nic_vlan="fakevlan",
ex_microsoft_time_zone="040",
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_ambigious_mcps_fail(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
with self.assertRaises(ValueError):
self.driver.create_node(
name="test3",
image=image,
auth=rootPw,
ex_network_domain="fakenetworkdomain",
ex_network="fakenetwork",
ex_primary_nic_vlan="fakevlan",
)
def test_create_node_no_network_domain_fail(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
with self.assertRaises(ValueError):
self.driver.create_node(
name="test3", image=image, auth=rootPw, ex_primary_nic_vlan="fakevlan"
)
def test_create_node_no_primary_nic_fail(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
with self.assertRaises(ValueError):
self.driver.create_node(
name="test3",
image=image,
auth=rootPw,
ex_network_domain="fakenetworkdomain",
)
def test_create_node_primary_vlan_nic(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
node = self.driver.create_node(
name="test3",
image=image,
auth=rootPw,
ex_network_domain="fakenetworkdomain",
ex_primary_nic_vlan="fakevlan",
ex_primary_nic_network_adapter="v1000",
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_primary_ipv4(self):
rootPw = "pass123"
image = self.driver.list_images()[0]
node = self.driver.create_node(
name="test3",
image=image,
auth=rootPw,
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_both_primary_nic_and_vlan_fail(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
with self.assertRaises(ValueError):
self.driver.create_node(
name="test3",
image=image,
auth=rootPw,
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_primary_nic_vlan="fakevlan",
)
def test_create_node_cpu_specification(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
cpu_spec = DimensionDataServerCpuSpecification(
cpu_count="4", cores_per_socket="2", performance="STANDARD"
)
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_is_started=False,
ex_cpu_specification=cpu_spec,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_memory(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_is_started=False,
ex_memory_gb=8,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_disks(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
disks = [DimensionDataServerDisk(scsi_id="0", speed="HIGHPERFORMANCE")]
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_is_started=False,
ex_disks=disks,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_disks_fail(self):
root_pw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
disks = "blah"
with self.assertRaises(TypeError):
self.driver.create_node(
name="test2",
image=image,
auth=root_pw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_is_started=False,
ex_disks=disks,
)
def test_create_node_ipv4_gateway(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_is_started=False,
ex_ipv4_gateway="10.2.2.2",
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_network_domain_no_vlan_no_ipv4_fail(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
with self.assertRaises(ValueError):
self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fake_network_domain",
ex_is_started=False,
)
def test_create_node_mcp2_additional_nics_legacy(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
additional_vlans = ["fakevlan1", "fakevlan2"]
additional_ipv4 = ["10.0.0.2", "10.0.0.3"]
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_ipv4="10.0.0.1",
ex_additional_nics_vlan=additional_vlans,
ex_additional_nics_ipv4=additional_ipv4,
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_bad_additional_nics_ipv4(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
with self.assertRaises(TypeError):
self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fake_network_domain",
ex_vlan="fake_vlan",
ex_additional_nics_ipv4="badstring",
ex_is_started=False,
)
def test_create_node_additional_nics(self):
root_pw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
nic1 = DimensionDataNic(vlan="fake_vlan", network_adapter_name="v1000")
nic2 = DimensionDataNic(private_ip_v4="10.1.1.2", network_adapter_name="v1000")
additional_nics = [nic1, nic2]
node = self.driver.create_node(
name="test2",
image=image,
auth=root_pw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_additional_nics=additional_nics,
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_additional_nics_vlan_ipv4_coexist_fail(self):
root_pw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
nic1 = DimensionDataNic(
private_ip_v4="10.1.1.1", vlan="fake_vlan", network_adapter_name="v1000"
)
nic2 = DimensionDataNic(
private_ip_v4="10.1.1.2", vlan="fake_vlan2", network_adapter_name="v1000"
)
additional_nics = [nic1, nic2]
with self.assertRaises(ValueError):
self.driver.create_node(
name="test2",
image=image,
auth=root_pw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_additional_nics=additional_nics,
ex_is_started=False,
)
def test_create_node_additional_nics_invalid_input_fail(self):
root_pw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
additional_nics = "blah"
with self.assertRaises(TypeError):
self.driver.create_node(
name="test2",
image=image,
auth=root_pw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_additional_nics=additional_nics,
ex_is_started=False,
)
def test_create_node_additional_nics_vlan_ipv4_not_exist_fail(self):
root_pw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
nic1 = DimensionDataNic(network_adapter_name="v1000")
nic2 = DimensionDataNic(network_adapter_name="v1000")
additional_nics = [nic1, nic2]
with self.assertRaises(ValueError):
self.driver.create_node(
name="test2",
image=image,
auth=root_pw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_additional_nics=additional_nics,
ex_is_started=False,
)
def test_create_node_bad_additional_nics_vlan(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
with self.assertRaises(TypeError):
self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fake_network_domain",
ex_vlan="fake_vlan",
ex_additional_nics_vlan="badstring",
ex_is_started=False,
)
def test_create_node_mcp2_indicate_dns(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test node dns",
ex_network_domain="fakenetworkdomain",
ex_primary_ipv4="10.0.0.1",
ex_primary_dns="8.8.8.8",
ex_secondary_dns="8.8.4.4",
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_ex_shutdown_graceful(self):
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
ret = self.driver.ex_shutdown_graceful(node)
self.assertTrue(ret is True)
def test_ex_shutdown_graceful_INPROGRESS(self):
DimensionDataMockHttp.type = "INPROGRESS"
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_shutdown_graceful(node)
def test_ex_start_node(self):
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
ret = self.driver.ex_start_node(node)
self.assertTrue(ret is True)
def test_ex_start_node_INPROGRESS(self):
DimensionDataMockHttp.type = "INPROGRESS"
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_start_node(node)
def test_ex_power_off(self):
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
ret = self.driver.ex_power_off(node)
self.assertTrue(ret is True)
def test_ex_update_vm_tools(self):
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
ret = self.driver.ex_update_vm_tools(node)
self.assertTrue(ret is True)
def test_ex_power_off_INPROGRESS(self):
DimensionDataMockHttp.type = "INPROGRESS"
node = Node(
id="11",
name=None,
state="STOPPING",
public_ips=None,
private_ips=None,
driver=self.driver,
)
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_power_off(node)
def test_ex_reset(self):
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
ret = self.driver.ex_reset(node)
self.assertTrue(ret is True)
def test_ex_attach_node_to_vlan(self):
node = self.driver.ex_get_node_by_id("e75ead52-692f-4314-8725-c8a4f4d13a87")
vlan = self.driver.ex_get_vlan("0e56433f-d808-4669-821d-812769517ff8")
ret = self.driver.ex_attach_node_to_vlan(node, vlan)
self.assertTrue(ret is True)
def test_ex_destroy_nic(self):
node = self.driver.ex_destroy_nic("a202e51b-41c0-4cfc-add0-b1c62fc0ecf6")
self.assertTrue(node)
def test_list_networks(self):
nets = self.driver.list_networks()
self.assertEqual(nets[0].name, "test-net1")
self.assertTrue(isinstance(nets[0].location, NodeLocation))
def test_ex_create_network(self):
location = self.driver.ex_get_location_by_id("NA9")
net = self.driver.ex_create_network(location, "Test Network", "test")
self.assertEqual(net.id, "208e3a8e-9d2f-11e2-b29c-001517c4643e")
self.assertEqual(net.name, "Test Network")
def test_ex_create_network_NO_DESCRIPTION(self):
location = self.driver.ex_get_location_by_id("NA9")
net = self.driver.ex_create_network(location, "Test Network")
self.assertEqual(net.id, "208e3a8e-9d2f-11e2-b29c-001517c4643e")
self.assertEqual(net.name, "Test Network")
def test_ex_delete_network(self):
net = self.driver.ex_list_networks()[0]
result = self.driver.ex_delete_network(net)
self.assertTrue(result)
def test_ex_rename_network(self):
net = self.driver.ex_list_networks()[0]
result = self.driver.ex_rename_network(net, "barry")
self.assertTrue(result)
def test_ex_create_network_domain(self):
location = self.driver.ex_get_location_by_id("NA9")
plan = NetworkDomainServicePlan.ADVANCED
net = self.driver.ex_create_network_domain(
location=location, name="test", description="test", service_plan=plan
)
self.assertEqual(net.name, "test")
self.assertTrue(net.id, "f14a871f-9a25-470c-aef8-51e13202e1aa")
def test_ex_create_network_domain_NO_DESCRIPTION(self):
location = self.driver.ex_get_location_by_id("NA9")
plan = NetworkDomainServicePlan.ADVANCED
net = self.driver.ex_create_network_domain(
location=location, name="test", service_plan=plan
)
self.assertEqual(net.name, "test")
self.assertTrue(net.id, "f14a871f-9a25-470c-aef8-51e13202e1aa")
def test_ex_get_network_domain(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
self.assertEqual(net.id, "8cdfd607-f429-4df6-9352-162cfc0891be")
self.assertEqual(net.description, "test2")
self.assertEqual(net.name, "test")
def test_ex_update_network_domain(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
net.name = "new name"
net2 = self.driver.ex_update_network_domain(net)
self.assertEqual(net2.name, "new name")
def test_ex_delete_network_domain(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
result = self.driver.ex_delete_network_domain(net)
self.assertTrue(result)
def test_ex_list_networks(self):
nets = self.driver.ex_list_networks()
self.assertEqual(nets[0].name, "test-net1")
self.assertTrue(isinstance(nets[0].location, NodeLocation))
def test_ex_list_network_domains(self):
nets = self.driver.ex_list_network_domains()
self.assertEqual(nets[0].name, "Aurora")
self.assertTrue(isinstance(nets[0].location, NodeLocation))
def test_ex_list_network_domains_ALLFILTERS(self):
DimensionDataMockHttp.type = "ALLFILTERS"
nets = self.driver.ex_list_network_domains(
location="fake_location",
name="fake_name",
service_plan="fake_plan",
state="fake_state",
)
self.assertEqual(nets[0].name, "Aurora")
self.assertTrue(isinstance(nets[0].location, NodeLocation))
def test_ex_list_vlans(self):
vlans = self.driver.ex_list_vlans()
self.assertEqual(vlans[0].name, "Primary")
def test_ex_list_vlans_ALLFILTERS(self):
DimensionDataMockHttp.type = "ALLFILTERS"
vlans = self.driver.ex_list_vlans(
location="fake_location",
network_domain="fake_network_domain",
name="fake_name",
ipv4_address="fake_ipv4",
ipv6_address="fake_ipv6",
state="fake_state",
)
self.assertEqual(vlans[0].name, "Primary")
def test_ex_create_vlan(
self,
):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
vlan = self.driver.ex_create_vlan(
network_domain=net,
name="test",
private_ipv4_base_address="10.3.4.0",
private_ipv4_prefix_size="24",
description="test vlan",
)
self.assertEqual(vlan.id, "0e56433f-d808-4669-821d-812769517ff8")
def test_ex_create_vlan_NO_DESCRIPTION(
self,
):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
vlan = self.driver.ex_create_vlan(
network_domain=net,
name="test",
private_ipv4_base_address="10.3.4.0",
private_ipv4_prefix_size="24",
)
self.assertEqual(vlan.id, "0e56433f-d808-4669-821d-812769517ff8")
def test_ex_get_vlan(self):
vlan = self.driver.ex_get_vlan("0e56433f-d808-4669-821d-812769517ff8")
self.assertEqual(vlan.id, "0e56433f-d808-4669-821d-812769517ff8")
self.assertEqual(vlan.description, "test2")
self.assertEqual(vlan.status, "NORMAL")
self.assertEqual(vlan.name, "Production VLAN")
self.assertEqual(vlan.private_ipv4_range_address, "10.0.3.0")
self.assertEqual(vlan.private_ipv4_range_size, 24)
self.assertEqual(vlan.ipv6_range_size, 64)
self.assertEqual(vlan.ipv6_range_address, "2607:f480:1111:1153:0:0:0:0")
self.assertEqual(vlan.ipv4_gateway, "10.0.3.1")
self.assertEqual(vlan.ipv6_gateway, "2607:f480:1111:1153:0:0:0:1")
def test_ex_wait_for_state(self):
self.driver.ex_wait_for_state(
"NORMAL",
self.driver.ex_get_vlan,
vlan_id="0e56433f-d808-4669-821d-812769517ff8",
poll_interval=0.1,
)
def test_ex_wait_for_state_NODE(self):
self.driver.ex_wait_for_state(
"running",
self.driver.ex_get_node_by_id,
id="e75ead52-692f-4314-8725-c8a4f4d13a87",
poll_interval=0.1,
)
def test_ex_wait_for_state_FAIL(self):
with self.assertRaises(DimensionDataAPIException) as context:
self.driver.ex_wait_for_state(
"starting",
self.driver.ex_get_node_by_id,
id="e75ead52-692f-4314-8725-c8a4f4d13a87",
poll_interval=0.1,
timeout=0.1,
)
self.assertEqual(context.exception.code, "running")
self.assertTrue("timed out" in context.exception.msg)
def test_ex_update_vlan(self):
vlan = self.driver.ex_get_vlan("0e56433f-d808-4669-821d-812769517ff8")
vlan.name = "new name"
vlan2 = self.driver.ex_update_vlan(vlan)
self.assertEqual(vlan2.name, "new name")
def test_ex_delete_vlan(self):
vlan = self.driver.ex_get_vlan("0e56433f-d808-4669-821d-812769517ff8")
result = self.driver.ex_delete_vlan(vlan)
self.assertTrue(result)
def test_ex_expand_vlan(self):
vlan = self.driver.ex_get_vlan("0e56433f-d808-4669-821d-812769517ff8")
vlan.private_ipv4_range_size = "23"
vlan = self.driver.ex_expand_vlan(vlan)
self.assertEqual(vlan.private_ipv4_range_size, "23")
def test_ex_add_public_ip_block_to_network_domain(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
block = self.driver.ex_add_public_ip_block_to_network_domain(net)
self.assertEqual(block.id, "9945dc4a-bdce-11e4-8c14-b8ca3a5d9ef8")
def test_ex_list_public_ip_blocks(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
blocks = self.driver.ex_list_public_ip_blocks(net)
self.assertEqual(blocks[0].base_ip, "168.128.4.18")
self.assertEqual(blocks[0].size, "2")
self.assertEqual(blocks[0].id, "9945dc4a-bdce-11e4-8c14-b8ca3a5d9ef8")
self.assertEqual(blocks[0].location.id, "NA9")
self.assertEqual(blocks[0].network_domain.id, net.id)
def test_ex_get_public_ip_block(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
block = self.driver.ex_get_public_ip_block(
"9945dc4a-bdce-11e4-8c14-b8ca3a5d9ef8"
)
self.assertEqual(block.base_ip, "168.128.4.18")
self.assertEqual(block.size, "2")
self.assertEqual(block.id, "9945dc4a-bdce-11e4-8c14-b8ca3a5d9ef8")
self.assertEqual(block.location.id, "NA9")
self.assertEqual(block.network_domain.id, net.id)
def test_ex_delete_public_ip_block(self):
block = self.driver.ex_get_public_ip_block(
"9945dc4a-bdce-11e4-8c14-b8ca3a5d9ef8"
)
result = self.driver.ex_delete_public_ip_block(block)
self.assertTrue(result)
def test_ex_list_firewall_rules(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = self.driver.ex_list_firewall_rules(net)
self.assertEqual(rules[0].id, "756cba02-b0bc-48f4-aea5-9445870b6148")
self.assertEqual(
rules[0].network_domain.id, "8cdfd607-f429-4df6-9352-162cfc0891be"
)
self.assertEqual(rules[0].name, "CCDEFAULT.BlockOutboundMailIPv4")
self.assertEqual(rules[0].action, "DROP")
self.assertEqual(rules[0].ip_version, "IPV4")
self.assertEqual(rules[0].protocol, "TCP")
self.assertEqual(rules[0].source.ip_address, "ANY")
self.assertTrue(rules[0].source.any_ip)
self.assertTrue(rules[0].destination.any_ip)
def test_ex_create_firewall_rule(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = self.driver.ex_list_firewall_rules(net)
rule = self.driver.ex_create_firewall_rule(net, rules[0], "FIRST")
self.assertEqual(rule.id, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
def test_ex_create_firewall_rule_with_specific_source_ip(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = self.driver.ex_list_firewall_rules(net)
specific_source_ip_rule = list(
filter(lambda x: x.name == "SpecificSourceIP", rules)
)[0]
rule = self.driver.ex_create_firewall_rule(
net, specific_source_ip_rule, "FIRST"
)
self.assertEqual(rule.id, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
def test_ex_create_firewall_rule_with_source_ip(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = self.driver.ex_list_firewall_rules(net)
specific_source_ip_rule = list(
filter(lambda x: x.name == "SpecificSourceIP", rules)
)[0]
specific_source_ip_rule.source.any_ip = False
specific_source_ip_rule.source.ip_address = "10.0.0.1"
specific_source_ip_rule.source.ip_prefix_size = "15"
rule = self.driver.ex_create_firewall_rule(
net, specific_source_ip_rule, "FIRST"
)
self.assertEqual(rule.id, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
def test_ex_create_firewall_rule_with_any_ip(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = self.driver.ex_list_firewall_rules(net)
specific_source_ip_rule = list(
filter(lambda x: x.name == "SpecificSourceIP", rules)
)[0]
specific_source_ip_rule.source.any_ip = True
rule = self.driver.ex_create_firewall_rule(
net, specific_source_ip_rule, "FIRST"
)
self.assertEqual(rule.id, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
def test_ex_create_firewall_rule_ip_prefix_size(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_list_firewall_rules(net)[0]
rule.source.address_list_id = None
rule.source.any_ip = False
rule.source.ip_address = "10.2.1.1"
rule.source.ip_prefix_size = "10"
rule.destination.address_list_id = None
rule.destination.any_ip = False
rule.destination.ip_address = "10.0.0.1"
rule.destination.ip_prefix_size = "20"
self.driver.ex_create_firewall_rule(net, rule, "LAST")
def test_ex_create_firewall_rule_address_list(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_list_firewall_rules(net)[0]
rule.source.address_list_id = "12345"
rule.destination.address_list_id = "12345"
self.driver.ex_create_firewall_rule(net, rule, "LAST")
def test_ex_create_firewall_rule_port_list(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_list_firewall_rules(net)[0]
rule.source.port_list_id = "12345"
rule.destination.port_list_id = "12345"
self.driver.ex_create_firewall_rule(net, rule, "LAST")
def test_ex_create_firewall_rule_port(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_list_firewall_rules(net)[0]
rule.source.port_list_id = None
rule.source.port_begin = "8000"
rule.source.port_end = "8005"
rule.destination.port_list_id = None
rule.destination.port_begin = "7000"
rule.destination.port_end = "7005"
self.driver.ex_create_firewall_rule(net, rule, "LAST")
def test_ex_create_firewall_rule_ALL_VALUES(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = self.driver.ex_list_firewall_rules(net)
for rule in rules:
self.driver.ex_create_firewall_rule(net, rule, "LAST")
def test_ex_create_firewall_rule_WITH_POSITION_RULE(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = self.driver.ex_list_firewall_rules(net)
rule = self.driver.ex_create_firewall_rule(net, rules[-2], "BEFORE", rules[-1])
self.assertEqual(rule.id, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
def test_ex_create_firewall_rule_WITH_POSITION_RULE_STR(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = self.driver.ex_list_firewall_rules(net)
rule = self.driver.ex_create_firewall_rule(
net, rules[-2], "BEFORE", "RULE_WITH_SOURCE_AND_DEST"
)
self.assertEqual(rule.id, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
def test_ex_create_firewall_rule_FAIL_POSITION(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = self.driver.ex_list_firewall_rules(net)
with self.assertRaises(ValueError):
self.driver.ex_create_firewall_rule(net, rules[0], "BEFORE")
def test_ex_create_firewall_rule_FAIL_POSITION_WITH_RULE(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = self.driver.ex_list_firewall_rules(net)
with self.assertRaises(ValueError):
self.driver.ex_create_firewall_rule(
net, rules[0], "LAST", "RULE_WITH_SOURCE_AND_DEST"
)
def test_ex_get_firewall_rule(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(
net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c"
)
self.assertEqual(rule.id, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
def test_ex_set_firewall_rule_state(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(
net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c"
)
result = self.driver.ex_set_firewall_rule_state(rule, False)
self.assertTrue(result)
def test_ex_delete_firewall_rule(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(
net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c"
)
result = self.driver.ex_delete_firewall_rule(rule)
self.assertTrue(result)
def test_ex_edit_firewall_rule(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(
net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c"
)
rule.source.any_ip = True
result = self.driver.ex_edit_firewall_rule(rule=rule, position="LAST")
self.assertTrue(result)
def test_ex_edit_firewall_rule_source_ipaddresslist(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(
net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c"
)
rule.source.address_list_id = "802abc9f-45a7-4efb-9d5a-810082368222"
rule.source.any_ip = False
rule.source.ip_address = "10.0.0.1"
rule.source.ip_prefix_size = 10
result = self.driver.ex_edit_firewall_rule(rule=rule, position="LAST")
self.assertTrue(result)
def test_ex_edit_firewall_rule_destination_ipaddresslist(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(
net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c"
)
rule.destination.address_list_id = "802abc9f-45a7-4efb-9d5a-810082368222"
rule.destination.any_ip = False
rule.destination.ip_address = "10.0.0.1"
rule.destination.ip_prefix_size = 10
result = self.driver.ex_edit_firewall_rule(rule=rule, position="LAST")
self.assertTrue(result)
def test_ex_edit_firewall_rule_destination_ipaddress(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(
net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c"
)
rule.source.address_list_id = None
rule.source.any_ip = False
rule.source.ip_address = "10.0.0.1"
rule.source.ip_prefix_size = "10"
result = self.driver.ex_edit_firewall_rule(rule=rule, position="LAST")
self.assertTrue(result)
def test_ex_edit_firewall_rule_source_ipaddress(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(
net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c"
)
rule.destination.address_list_id = None
rule.destination.any_ip = False
rule.destination.ip_address = "10.0.0.1"
rule.destination.ip_prefix_size = "10"
result = self.driver.ex_edit_firewall_rule(rule=rule, position="LAST")
self.assertTrue(result)
def test_ex_edit_firewall_rule_with_relative_rule(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(
net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c"
)
placement_rule = self.driver.ex_list_firewall_rules(network_domain=net)[-1]
result = self.driver.ex_edit_firewall_rule(
rule=rule, position="BEFORE", relative_rule_for_position=placement_rule
)
self.assertTrue(result)
def test_ex_edit_firewall_rule_with_relative_rule_by_name(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(
net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c"
)
placement_rule = self.driver.ex_list_firewall_rules(network_domain=net)[-1]
result = self.driver.ex_edit_firewall_rule(
rule=rule, position="BEFORE", relative_rule_for_position=placement_rule.name
)
self.assertTrue(result)
def test_ex_edit_firewall_rule_source_portlist(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(
net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c"
)
rule.source.port_list_id = "802abc9f-45a7-4efb-9d5a-810082368222"
result = self.driver.ex_edit_firewall_rule(rule=rule, position="LAST")
self.assertTrue(result)
def test_ex_edit_firewall_rule_source_port(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(
net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c"
)
rule.source.port_list_id = None
rule.source.port_begin = "3"
rule.source.port_end = "10"
result = self.driver.ex_edit_firewall_rule(rule=rule, position="LAST")
self.assertTrue(result)
def test_ex_edit_firewall_rule_destination_portlist(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(
net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c"
)
rule.destination.port_list_id = "802abc9f-45a7-4efb-9d5a-810082368222"
result = self.driver.ex_edit_firewall_rule(rule=rule, position="LAST")
self.assertTrue(result)
def test_ex_edit_firewall_rule_destination_port(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(
net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c"
)
rule.destination.port_list_id = None
rule.destination.port_begin = "3"
rule.destination.port_end = "10"
result = self.driver.ex_edit_firewall_rule(rule=rule, position="LAST")
self.assertTrue(result)
def test_ex_edit_firewall_rule_invalid_position_fail(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(
net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c"
)
with self.assertRaises(ValueError):
self.driver.ex_edit_firewall_rule(rule=rule, position="BEFORE")
def test_ex_edit_firewall_rule_invalid_position_relative_rule_fail(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(
net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c"
)
relative_rule = self.driver.ex_list_firewall_rules(network_domain=net)[-1]
with self.assertRaises(ValueError):
self.driver.ex_edit_firewall_rule(
rule=rule, position="FIRST", relative_rule_for_position=relative_rule
)
def test_ex_create_nat_rule(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_create_nat_rule(net, "1.2.3.4", "4.3.2.1")
self.assertEqual(rule.id, "d31c2db0-be6b-4d50-8744-9a7a534b5fba")
def test_ex_list_nat_rules(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = self.driver.ex_list_nat_rules(net)
self.assertEqual(rules[0].id, "2187a636-7ebb-49a1-a2ff-5d617f496dce")
self.assertEqual(rules[0].internal_ip, "10.0.0.15")
self.assertEqual(rules[0].external_ip, "165.180.12.18")
def test_ex_get_nat_rule(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_nat_rule(net, "2187a636-7ebb-49a1-a2ff-5d617f496dce")
self.assertEqual(rule.id, "2187a636-7ebb-49a1-a2ff-5d617f496dce")
self.assertEqual(rule.internal_ip, "10.0.0.16")
self.assertEqual(rule.external_ip, "165.180.12.19")
def test_ex_delete_nat_rule(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_nat_rule(net, "2187a636-7ebb-49a1-a2ff-5d617f496dce")
result = self.driver.ex_delete_nat_rule(rule)
self.assertTrue(result)
def test_ex_enable_monitoring(self):
node = self.driver.list_nodes()[0]
result = self.driver.ex_enable_monitoring(node, "ADVANCED")
self.assertTrue(result)
def test_ex_disable_monitoring(self):
node = self.driver.list_nodes()[0]
result = self.driver.ex_disable_monitoring(node)
self.assertTrue(result)
def test_ex_change_monitoring_plan(self):
node = self.driver.list_nodes()[0]
result = self.driver.ex_update_monitoring_plan(node, "ESSENTIALS")
self.assertTrue(result)
def test_ex_add_storage_to_node(self):
node = self.driver.list_nodes()[0]
result = self.driver.ex_add_storage_to_node(node, 30, "PERFORMANCE")
self.assertTrue(result)
def test_ex_remove_storage_from_node(self):
node = self.driver.list_nodes()[0]
result = self.driver.ex_remove_storage_from_node(node, 0)
self.assertTrue(result)
def test_ex_change_storage_speed(self):
node = self.driver.list_nodes()[0]
result = self.driver.ex_change_storage_speed(node, 1, "PERFORMANCE")
self.assertTrue(result)
def test_ex_change_storage_size(self):
node = self.driver.list_nodes()[0]
result = self.driver.ex_change_storage_size(node, 1, 100)
self.assertTrue(result)
def test_ex_clone_node_to_image(self):
node = self.driver.list_nodes()[0]
result = self.driver.ex_clone_node_to_image(node, "my image", "a description")
self.assertTrue(result)
def test_ex_update_node(self):
node = self.driver.list_nodes()[0]
result = self.driver.ex_update_node(
node, "my new name", "a description", 2, 4048
)
self.assertTrue(result)
def test_ex_reconfigure_node(self):
node = self.driver.list_nodes()[0]
result = self.driver.ex_reconfigure_node(node, 4, 4, 1, "HIGHPERFORMANCE")
self.assertTrue(result)
def test_ex_get_location_by_id(self):
location = self.driver.ex_get_location_by_id("NA9")
self.assertTrue(location.id, "NA9")
def test_ex_get_location_by_id_NO_LOCATION(self):
location = self.driver.ex_get_location_by_id(None)
self.assertIsNone(location)
def test_ex_get_base_image_by_id(self):
image_id = self.driver.list_images()[0].id
image = self.driver.ex_get_base_image_by_id(image_id)
self.assertEqual(image.extra["OS_type"], "UNIX")
def test_ex_get_customer_image_by_id(self):
image_id = self.driver.ex_list_customer_images()[1].id
image = self.driver.ex_get_customer_image_by_id(image_id)
self.assertEqual(image.extra["OS_type"], "WINDOWS")
def test_ex_get_image_by_id_base_img(self):
image_id = self.driver.list_images()[1].id
image = self.driver.ex_get_base_image_by_id(image_id)
self.assertEqual(image.extra["OS_type"], "WINDOWS")
def test_ex_get_image_by_id_customer_img(self):
image_id = self.driver.ex_list_customer_images()[0].id
image = self.driver.ex_get_customer_image_by_id(image_id)
self.assertEqual(image.extra["OS_type"], "UNIX")
def test_ex_get_image_by_id_customer_FAIL(self):
image_id = "FAKE_IMAGE_ID"
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_get_base_image_by_id(image_id)
def test_ex_create_anti_affinity_rule(self):
node_list = self.driver.list_nodes()
success = self.driver.ex_create_anti_affinity_rule([node_list[0], node_list[1]])
self.assertTrue(success)
def test_ex_create_anti_affinity_rule_TUPLE(self):
node_list = self.driver.list_nodes()
success = self.driver.ex_create_anti_affinity_rule((node_list[0], node_list[1]))
self.assertTrue(success)
def test_ex_create_anti_affinity_rule_TUPLE_STR(self):
node_list = self.driver.list_nodes()
success = self.driver.ex_create_anti_affinity_rule(
(node_list[0].id, node_list[1].id)
)
self.assertTrue(success)
def test_ex_create_anti_affinity_rule_FAIL_STR(self):
node_list = "string"
with self.assertRaises(TypeError):
self.driver.ex_create_anti_affinity_rule(node_list)
def test_ex_create_anti_affinity_rule_FAIL_EXISTING(self):
node_list = self.driver.list_nodes()
DimensionDataMockHttp.type = "FAIL_EXISTING"
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_create_anti_affinity_rule((node_list[0], node_list[1]))
def test_ex_delete_anti_affinity_rule(self):
net_domain = self.driver.ex_list_network_domains()[0]
rule = self.driver.ex_list_anti_affinity_rules(network_domain=net_domain)[0]
success = self.driver.ex_delete_anti_affinity_rule(rule)
self.assertTrue(success)
def test_ex_delete_anti_affinity_rule_STR(self):
net_domain = self.driver.ex_list_network_domains()[0]
rule = self.driver.ex_list_anti_affinity_rules(network_domain=net_domain)[0]
success = self.driver.ex_delete_anti_affinity_rule(rule.id)
self.assertTrue(success)
def test_ex_delete_anti_affinity_rule_FAIL(self):
net_domain = self.driver.ex_list_network_domains()[0]
rule = self.driver.ex_list_anti_affinity_rules(network_domain=net_domain)[0]
DimensionDataMockHttp.type = "FAIL"
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_delete_anti_affinity_rule(rule)
def test_ex_list_anti_affinity_rules_NETWORK_DOMAIN(self):
net_domain = self.driver.ex_list_network_domains()[0]
rules = self.driver.ex_list_anti_affinity_rules(network_domain=net_domain)
self.assertTrue(isinstance(rules, list))
self.assertEqual(len(rules), 2)
self.assertTrue(isinstance(rules[0].id, str))
self.assertTrue(isinstance(rules[0].node_list, list))
def test_ex_list_anti_affinity_rules_NETWORK(self):
network = self.driver.list_networks()[0]
rules = self.driver.ex_list_anti_affinity_rules(network=network)
self.assertTrue(isinstance(rules, list))
self.assertEqual(len(rules), 2)
self.assertTrue(isinstance(rules[0].id, str))
self.assertTrue(isinstance(rules[0].node_list, list))
def test_ex_list_anti_affinity_rules_NODE(self):
node = self.driver.list_nodes()[0]
rules = self.driver.ex_list_anti_affinity_rules(node=node)
self.assertTrue(isinstance(rules, list))
self.assertEqual(len(rules), 2)
self.assertTrue(isinstance(rules[0].id, str))
self.assertTrue(isinstance(rules[0].node_list, list))
def test_ex_list_anti_affinity_rules_PAGINATED(self):
net_domain = self.driver.ex_list_network_domains()[0]
DimensionDataMockHttp.type = "PAGINATED"
rules = self.driver.ex_list_anti_affinity_rules(network_domain=net_domain)
self.assertTrue(isinstance(rules, list))
self.assertEqual(len(rules), 4)
self.assertTrue(isinstance(rules[0].id, str))
self.assertTrue(isinstance(rules[0].node_list, list))
def test_ex_list_anti_affinity_rules_ALLFILTERS(self):
net_domain = self.driver.ex_list_network_domains()[0]
DimensionDataMockHttp.type = "ALLFILTERS"
rules = self.driver.ex_list_anti_affinity_rules(
network_domain=net_domain, filter_id="FAKE_ID", filter_state="FAKE_STATE"
)
self.assertTrue(isinstance(rules, list))
self.assertEqual(len(rules), 2)
self.assertTrue(isinstance(rules[0].id, str))
self.assertTrue(isinstance(rules[0].node_list, list))
def test_ex_list_anti_affinity_rules_BAD_ARGS(self):
with self.assertRaises(ValueError):
self.driver.ex_list_anti_affinity_rules(
network="fake_network", network_domain="fake_network_domain"
)
def test_ex_create_tag_key(self):
success = self.driver.ex_create_tag_key("MyTestKey")
self.assertTrue(success)
def test_ex_create_tag_key_ALLPARAMS(self):
self.driver.connection._get_orgId()
DimensionDataMockHttp.type = "ALLPARAMS"
success = self.driver.ex_create_tag_key(
"MyTestKey",
description="Test Key Desc.",
value_required=False,
display_on_report=False,
)
self.assertTrue(success)
def test_ex_create_tag_key_BADREQUEST(self):
self.driver.connection._get_orgId()
DimensionDataMockHttp.type = "BADREQUEST"
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_create_tag_key("MyTestKey")
def test_ex_list_tag_keys(self):
tag_keys = self.driver.ex_list_tag_keys()
self.assertTrue(isinstance(tag_keys, list))
self.assertTrue(isinstance(tag_keys[0], DimensionDataTagKey))
self.assertTrue(isinstance(tag_keys[0].id, str))
def test_ex_list_tag_keys_ALLFILTERS(self):
self.driver.connection._get_orgId()
DimensionDataMockHttp.type = "ALLFILTERS"
self.driver.ex_list_tag_keys(
id="fake_id",
name="fake_name",
value_required=False,
display_on_report=False,
)
def test_ex_get_tag_by_id(self):
tag = self.driver.ex_get_tag_key_by_id("d047c609-93d7-4bc5-8fc9-732c85840075")
self.assertTrue(isinstance(tag, DimensionDataTagKey))
def test_ex_get_tag_by_id_NOEXIST(self):
self.driver.connection._get_orgId()
DimensionDataMockHttp.type = "NOEXIST"
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_get_tag_key_by_id("d047c609-93d7-4bc5-8fc9-732c85840075")
def test_ex_get_tag_by_name(self):
self.driver.connection._get_orgId()
DimensionDataMockHttp.type = "SINGLE"
tag = self.driver.ex_get_tag_key_by_name("LibcloudTest")
self.assertTrue(isinstance(tag, DimensionDataTagKey))
def test_ex_get_tag_by_name_NOEXIST(self):
with self.assertRaises(ValueError):
self.driver.ex_get_tag_key_by_name("LibcloudTest")
def test_ex_modify_tag_key_NAME(self):
tag_key = self.driver.ex_list_tag_keys()[0]
DimensionDataMockHttp.type = "NAME"
success = self.driver.ex_modify_tag_key(tag_key, name="NewName")
self.assertTrue(success)
def test_ex_modify_tag_key_NOTNAME(self):
tag_key = self.driver.ex_list_tag_keys()[0]
DimensionDataMockHttp.type = "NOTNAME"
success = self.driver.ex_modify_tag_key(
tag_key, description="NewDesc", value_required=False, display_on_report=True
)
self.assertTrue(success)
def test_ex_modify_tag_key_NOCHANGE(self):
tag_key = self.driver.ex_list_tag_keys()[0]
DimensionDataMockHttp.type = "NOCHANGE"
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_modify_tag_key(tag_key)
def test_ex_remove_tag_key(self):
tag_key = self.driver.ex_list_tag_keys()[0]
success = self.driver.ex_remove_tag_key(tag_key)
self.assertTrue(success)
def test_ex_remove_tag_key_NOEXIST(self):
tag_key = self.driver.ex_list_tag_keys()[0]
DimensionDataMockHttp.type = "NOEXIST"
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_remove_tag_key(tag_key)
def test_ex_apply_tag_to_asset(self):
node = self.driver.list_nodes()[0]
success = self.driver.ex_apply_tag_to_asset(node, "TagKeyName", "FakeValue")
self.assertTrue(success)
def test_ex_apply_tag_to_asset_NOVALUE(self):
node = self.driver.list_nodes()[0]
DimensionDataMockHttp.type = "NOVALUE"
success = self.driver.ex_apply_tag_to_asset(node, "TagKeyName")
self.assertTrue(success)
def test_ex_apply_tag_to_asset_NOTAGKEY(self):
node = self.driver.list_nodes()[0]
DimensionDataMockHttp.type = "NOTAGKEY"
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_apply_tag_to_asset(node, "TagKeyNam")
def test_ex_apply_tag_to_asset_BADASSETTYPE(self):
network = self.driver.list_networks()[0]
DimensionDataMockHttp.type = "NOTAGKEY"
with self.assertRaises(TypeError):
self.driver.ex_apply_tag_to_asset(network, "TagKeyNam")
def test_ex_remove_tag_from_asset(self):
node = self.driver.list_nodes()[0]
success = self.driver.ex_remove_tag_from_asset(node, "TagKeyName")
self.assertTrue(success)
def test_ex_remove_tag_from_asset_NOTAG(self):
node = self.driver.list_nodes()[0]
DimensionDataMockHttp.type = "NOTAG"
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_remove_tag_from_asset(node, "TagKeyNam")
def test_ex_list_tags(self):
tags = self.driver.ex_list_tags()
self.assertTrue(isinstance(tags, list))
self.assertTrue(isinstance(tags[0], DimensionDataTag))
self.assertTrue(len(tags) == 3)
def test_ex_list_tags_ALLPARAMS(self):
self.driver.connection._get_orgId()
DimensionDataMockHttp.type = "ALLPARAMS"
tags = self.driver.ex_list_tags(
asset_id="fake_asset_id",
asset_type="fake_asset_type",
location="fake_location",
tag_key_name="fake_tag_key_name",
tag_key_id="fake_tag_key_id",
value="fake_value",
value_required=False,
display_on_report=False,
)
self.assertTrue(isinstance(tags, list))
self.assertTrue(isinstance(tags[0], DimensionDataTag))
self.assertTrue(len(tags) == 3)
def test_priv_location_to_location_id(self):
location = self.driver.ex_get_location_by_id("NA9")
self.assertEqual(self.driver._location_to_location_id(location), "NA9")
def test_priv_location_to_location_id_STR(self):
self.assertEqual(self.driver._location_to_location_id("NA9"), "NA9")
def test_priv_location_to_location_id_TYPEERROR(self):
with self.assertRaises(TypeError):
self.driver._location_to_location_id([1, 2, 3])
def test_priv_image_needs_auth_os_img(self):
image = self.driver.list_images()[1]
self.assertTrue(self.driver._image_needs_auth(image))
def test_priv_image_needs_auth_os_img_STR(self):
image = self.driver.list_images()[1].id
self.assertTrue(self.driver._image_needs_auth(image))
def test_priv_image_needs_auth_cust_img_windows(self):
image = self.driver.ex_list_customer_images()[1]
self.assertTrue(self.driver._image_needs_auth(image))
def test_priv_image_needs_auth_cust_img_windows_STR(self):
image = self.driver.ex_list_customer_images()[1].id
self.assertTrue(self.driver._image_needs_auth(image))
def test_priv_image_needs_auth_cust_img_linux(self):
image = self.driver.ex_list_customer_images()[0]
self.assertTrue(not self.driver._image_needs_auth(image))
def test_priv_image_needs_auth_cust_img_linux_STR(self):
image = self.driver.ex_list_customer_images()[0].id
self.assertTrue(not self.driver._image_needs_auth(image))
def test_summary_usage_report(self):
report = self.driver.ex_summary_usage_report("2016-06-01", "2016-06-30")
report_content = report
self.assertEqual(len(report_content), 13)
self.assertEqual(len(report_content[0]), 6)
def test_detailed_usage_report(self):
report = self.driver.ex_detailed_usage_report("2016-06-01", "2016-06-30")
report_content = report
self.assertEqual(len(report_content), 42)
self.assertEqual(len(report_content[0]), 4)
def test_audit_log_report(self):
report = self.driver.ex_audit_log_report("2016-06-01", "2016-06-30")
report_content = report
self.assertEqual(len(report_content), 25)
self.assertEqual(report_content[2][2], "OEC_SYSTEM")
def test_ex_list_ip_address_list(self):
net_domain = self.driver.ex_list_network_domains()[0]
ip_list = self.driver.ex_list_ip_address_list(ex_network_domain=net_domain)
self.assertTrue(isinstance(ip_list, list))
self.assertEqual(len(ip_list), 4)
self.assertTrue(isinstance(ip_list[0].name, str))
self.assertTrue(isinstance(ip_list[0].description, str))
self.assertTrue(isinstance(ip_list[0].ip_version, str))
self.assertTrue(isinstance(ip_list[0].state, str))
self.assertTrue(isinstance(ip_list[0].create_time, str))
self.assertTrue(isinstance(ip_list[0].child_ip_address_lists, list))
self.assertEqual(len(ip_list[1].child_ip_address_lists), 1)
self.assertTrue(isinstance(ip_list[1].child_ip_address_lists[0].name, str))
def test_ex_get_ip_address_list(self):
net_domain = self.driver.ex_list_network_domains()[0]
DimensionDataMockHttp.type = "FILTERBYNAME"
ip_list = self.driver.ex_get_ip_address_list(
ex_network_domain=net_domain.id,
ex_ip_address_list_name="Test_IP_Address_List_3",
)
self.assertTrue(isinstance(ip_list, list))
self.assertEqual(len(ip_list), 1)
self.assertTrue(isinstance(ip_list[0].name, str))
self.assertTrue(isinstance(ip_list[0].description, str))
self.assertTrue(isinstance(ip_list[0].ip_version, str))
self.assertTrue(isinstance(ip_list[0].state, str))
self.assertTrue(isinstance(ip_list[0].create_time, str))
ips = ip_list[0].ip_address_collection
self.assertEqual(len(ips), 3)
self.assertTrue(isinstance(ips[0].begin, str))
self.assertTrue(isinstance(ips[0].prefix_size, str))
self.assertTrue(isinstance(ips[2].end, str))
def test_ex_create_ip_address_list_FAIL(self):
net_domain = self.driver.ex_list_network_domains()[0]
with self.assertRaises(TypeError):
self.driver.ex_create_ip_address_list(ex_network_domain=net_domain.id)
def test_ex_create_ip_address_list(self):
name = "Test_IP_Address_List_3"
description = "Test Description"
ip_version = "IPV4"
child_ip_address_list_id = "0291ef78-4059-4bc1-b433-3f6ad698dc41"
child_ip_address_list = DimensionDataChildIpAddressList(
id=child_ip_address_list_id, name="test_child_ip_addr_list"
)
net_domain = self.driver.ex_list_network_domains()[0]
ip_address_1 = DimensionDataIpAddress(begin="190.2.2.100")
ip_address_2 = DimensionDataIpAddress(begin="190.2.2.106", end="190.2.2.108")
ip_address_3 = DimensionDataIpAddress(begin="190.2.2.0", prefix_size="24")
ip_address_collection = [ip_address_1, ip_address_2, ip_address_3]
# Create IP Address List
success = self.driver.ex_create_ip_address_list(
ex_network_domain=net_domain,
name=name,
ip_version=ip_version,
description=description,
ip_address_collection=ip_address_collection,
child_ip_address_list=child_ip_address_list,
)
self.assertTrue(success)
def test_ex_create_ip_address_list_STR(self):
name = "Test_IP_Address_List_3"
description = "Test Description"
ip_version = "IPV4"
child_ip_address_list_id = "0291ef78-4059-4bc1-b433-3f6ad698dc41"
net_domain = self.driver.ex_list_network_domains()[0]
ip_address_1 = DimensionDataIpAddress(begin="190.2.2.100")
ip_address_2 = DimensionDataIpAddress(begin="190.2.2.106", end="190.2.2.108")
ip_address_3 = DimensionDataIpAddress(begin="190.2.2.0", prefix_size="24")
ip_address_collection = [ip_address_1, ip_address_2, ip_address_3]
# Create IP Address List
success = self.driver.ex_create_ip_address_list(
ex_network_domain=net_domain.id,
name=name,
ip_version=ip_version,
description=description,
ip_address_collection=ip_address_collection,
child_ip_address_list=child_ip_address_list_id,
)
self.assertTrue(success)
def test_ex_edit_ip_address_list(self):
ip_address_1 = DimensionDataIpAddress(begin="190.2.2.111")
ip_address_collection = [ip_address_1]
child_ip_address_list = DimensionDataChildIpAddressList(
id="2221ef78-4059-4bc1-b433-3f6ad698dc41",
name="test_child_ip_address_list edited",
)
ip_address_list = DimensionDataIpAddressList(
id="1111ef78-4059-4bc1-b433-3f6ad698d111",
name="test ip address list edited",
ip_version="IPv4",
description="test",
ip_address_collection=ip_address_collection,
child_ip_address_lists=child_ip_address_list,
state="NORMAL",
create_time="2015-09-29T02:49:45",
)
success = self.driver.ex_edit_ip_address_list(
ex_ip_address_list=ip_address_list,
description="test ip address list",
ip_address_collection=ip_address_collection,
child_ip_address_lists=child_ip_address_list,
)
self.assertTrue(success)
def test_ex_edit_ip_address_list_STR(self):
ip_address_1 = DimensionDataIpAddress(begin="190.2.2.111")
ip_address_collection = [ip_address_1]
child_ip_address_list = DimensionDataChildIpAddressList(
id="2221ef78-4059-4bc1-b433-3f6ad698dc41",
name="test_child_ip_address_list edited",
)
success = self.driver.ex_edit_ip_address_list(
ex_ip_address_list="84e34850-595d- 436e-a885-7cd37edb24a4",
description="test ip address list",
ip_address_collection=ip_address_collection,
child_ip_address_lists=child_ip_address_list,
)
self.assertTrue(success)
def test_ex_delete_ip_address_list(self):
child_ip_address_list = DimensionDataChildIpAddressList(
id="2221ef78-4059-4bc1-b433-3f6ad698dc41",
name="test_child_ip_address_list edited",
)
ip_address_list = DimensionDataIpAddressList(
id="1111ef78-4059-4bc1-b433-3f6ad698d111",
name="test ip address list edited",
ip_version="IPv4",
description="test",
ip_address_collection=None,
child_ip_address_lists=child_ip_address_list,
state="NORMAL",
create_time="2015-09-29T02:49:45",
)
success = self.driver.ex_delete_ip_address_list(
ex_ip_address_list=ip_address_list
)
self.assertTrue(success)
def test_ex_delete_ip_address_list_STR(self):
success = self.driver.ex_delete_ip_address_list(
ex_ip_address_list="111ef78-4059-4bc1-b433-3f6ad698d111"
)
self.assertTrue(success)
def test_ex_list_portlist(self):
net_domain = self.driver.ex_list_network_domains()[0]
portlist = self.driver.ex_list_portlist(ex_network_domain=net_domain)
self.assertTrue(isinstance(portlist, list))
self.assertEqual(len(portlist), 3)
self.assertTrue(isinstance(portlist[0].name, str))
self.assertTrue(isinstance(portlist[0].description, str))
self.assertTrue(isinstance(portlist[0].state, str))
self.assertTrue(isinstance(portlist[0].port_collection, list))
self.assertTrue(isinstance(portlist[0].port_collection[0].begin, str))
self.assertTrue(isinstance(portlist[0].port_collection[0].end, str))
self.assertTrue(isinstance(portlist[0].child_portlist_list, list))
self.assertTrue(isinstance(portlist[0].child_portlist_list[0].id, str))
self.assertTrue(isinstance(portlist[0].child_portlist_list[0].name, str))
self.assertTrue(isinstance(portlist[0].create_time, str))
def test_ex_get_port_list(self):
net_domain = self.driver.ex_list_network_domains()[0]
portlist_id = self.driver.ex_list_portlist(ex_network_domain=net_domain)[0].id
portlist = self.driver.ex_get_portlist(ex_portlist_id=portlist_id)
self.assertTrue(isinstance(portlist, DimensionDataPortList))
self.assertTrue(isinstance(portlist.name, str))
self.assertTrue(isinstance(portlist.description, str))
self.assertTrue(isinstance(portlist.state, str))
self.assertTrue(isinstance(portlist.port_collection, list))
self.assertTrue(isinstance(portlist.port_collection[0].begin, str))
self.assertTrue(isinstance(portlist.port_collection[0].end, str))
self.assertTrue(isinstance(portlist.child_portlist_list, list))
self.assertTrue(isinstance(portlist.child_portlist_list[0].id, str))
self.assertTrue(isinstance(portlist.child_portlist_list[0].name, str))
self.assertTrue(isinstance(portlist.create_time, str))
def test_ex_get_portlist_STR(self):
net_domain = self.driver.ex_list_network_domains()[0]
portlist = self.driver.ex_list_portlist(ex_network_domain=net_domain)[0]
port_list = self.driver.ex_get_portlist(ex_portlist_id=portlist.id)
self.assertTrue(isinstance(port_list, DimensionDataPortList))
self.assertTrue(isinstance(port_list.name, str))
self.assertTrue(isinstance(port_list.description, str))
self.assertTrue(isinstance(port_list.state, str))
self.assertTrue(isinstance(port_list.port_collection, list))
self.assertTrue(isinstance(port_list.port_collection[0].begin, str))
self.assertTrue(isinstance(port_list.port_collection[0].end, str))
self.assertTrue(isinstance(port_list.child_portlist_list, list))
self.assertTrue(isinstance(port_list.child_portlist_list[0].id, str))
self.assertTrue(isinstance(port_list.child_portlist_list[0].name, str))
self.assertTrue(isinstance(port_list.create_time, str))
def test_ex_create_portlist_NOCHILDPORTLIST(self):
name = "Test_Port_List"
description = "Test Description"
net_domain = self.driver.ex_list_network_domains()[0]
port_1 = DimensionDataPort(begin="8080")
port_2 = DimensionDataIpAddress(begin="8899", end="9023")
port_collection = [port_1, port_2]
# Create IP Address List
success = self.driver.ex_create_portlist(
ex_network_domain=net_domain,
name=name,
description=description,
port_collection=port_collection,
)
self.assertTrue(success)
def test_ex_create_portlist(self):
name = "Test_Port_List"
description = "Test Description"
net_domain = self.driver.ex_list_network_domains()[0]
port_1 = DimensionDataPort(begin="8080")
port_2 = DimensionDataIpAddress(begin="8899", end="9023")
port_collection = [port_1, port_2]
child_port_1 = DimensionDataChildPortList(
id="333174a2-ae74-4658-9e56-50fc90e086cf", name="test port 1"
)
child_port_2 = DimensionDataChildPortList(
id="311174a2-ae74-4658-9e56-50fc90e04444", name="test port 2"
)
child_ports = [child_port_1, child_port_2]
# Create IP Address List
success = self.driver.ex_create_portlist(
ex_network_domain=net_domain,
name=name,
description=description,
port_collection=port_collection,
child_portlist_list=child_ports,
)
self.assertTrue(success)
def test_ex_create_portlist_STR(self):
name = "Test_Port_List"
description = "Test Description"
net_domain = self.driver.ex_list_network_domains()[0]
port_1 = DimensionDataPort(begin="8080")
port_2 = DimensionDataIpAddress(begin="8899", end="9023")
port_collection = [port_1, port_2]
child_port_1 = DimensionDataChildPortList(
id="333174a2-ae74-4658-9e56-50fc90e086cf", name="test port 1"
)
child_port_2 = DimensionDataChildPortList(
id="311174a2-ae74-4658-9e56-50fc90e04444", name="test port 2"
)
child_ports_ids = [child_port_1.id, child_port_2.id]
# Create IP Address List
success = self.driver.ex_create_portlist(
ex_network_domain=net_domain.id,
name=name,
description=description,
port_collection=port_collection,
child_portlist_list=child_ports_ids,
)
self.assertTrue(success)
def test_ex_edit_portlist(self):
net_domain = self.driver.ex_list_network_domains()[0]
portlist = self.driver.ex_list_portlist(net_domain)[0]
description = "Test Description"
port_1 = DimensionDataPort(begin="8080")
port_2 = DimensionDataIpAddress(begin="8899", end="9023")
port_collection = [port_1, port_2]
child_port_1 = DimensionDataChildPortList(
id="333174a2-ae74-4658-9e56-50fc90e086cf", name="test port 1"
)
child_port_2 = DimensionDataChildPortList(
id="311174a2-ae74-4658-9e56-50fc90e04444", name="test port 2"
)
child_ports = [child_port_1.id, child_port_2.id]
# Create IP Address List
success = self.driver.ex_edit_portlist(
ex_portlist=portlist,
description=description,
port_collection=port_collection,
child_portlist_list=child_ports,
)
self.assertTrue(success)
def test_ex_edit_portlist_STR(self):
portlist_id = "484174a2-ae74-4658-9e56-50fc90e086cf"
description = "Test Description"
port_1 = DimensionDataPort(begin="8080")
port_2 = DimensionDataIpAddress(begin="8899", end="9023")
port_collection = [port_1, port_2]
child_port_1 = DimensionDataChildPortList(
id="333174a2-ae74-4658-9e56-50fc90e086cf", name="test port 1"
)
child_port_2 = DimensionDataChildPortList(
id="311174a2-ae74-4658-9e56-50fc90e04444", name="test port 2"
)
child_ports_ids = [child_port_1.id, child_port_2.id]
# Create IP Address List
success = self.driver.ex_edit_portlist(
ex_portlist=portlist_id,
description=description,
port_collection=port_collection,
child_portlist_list=child_ports_ids,
)
self.assertTrue(success)
def test_ex_delete_portlist(self):
net_domain = self.driver.ex_list_network_domains()[0]
portlist = self.driver.ex_list_portlist(net_domain)[0]
success = self.driver.ex_delete_portlist(ex_portlist=portlist)
self.assertTrue(success)
def test_ex_delete_portlist_STR(self):
net_domain = self.driver.ex_list_network_domains()[0]
portlist = self.driver.ex_list_portlist(net_domain)[0]
success = self.driver.ex_delete_portlist(ex_portlist=portlist.id)
self.assertTrue(success)
def test_import_image(self):
tag_dictionaries = {"tagkey1_name": "dev test", "tagkey2_name": None}
success = self.driver.import_image(
ovf_package_name="aTestGocToNGoc2_export2.mf",
name="Libcloud NGOCImage_New 2",
description="test",
cluster_id="QA1_N2_VMWARE_1-01",
is_guest_os_customization="false",
tagkey_name_value_dictionaries=tag_dictionaries,
)
self.assertTrue(success)
def test_import_image_error_too_many_choice(self):
tag_dictionaries = {"tagkey1_name": "dev test", "tagkey2_name": None}
with self.assertRaises(ValueError):
self.driver.import_image(
ovf_package_name="aTestGocToNGoc2_export2.mf",
name="Libcloud NGOCImage_New 2",
description="test",
cluster_id="QA1_N2_VMWARE_1-01",
datacenter_id="QA1_N1_VMWARE_1",
is_guest_os_customization="false",
tagkey_name_value_dictionaries=tag_dictionaries,
)
def test_import_image_error_missing_choice(self):
tag_dictionaries = {"tagkey1_name": "dev test", "tagkey2_name": None}
with self.assertRaises(ValueError):
self.driver.import_image(
ovf_package_name="aTestGocToNGoc2_export2.mf",
name="Libcloud NGOCImage_New 2",
description="test",
cluster_id=None,
datacenter_id=None,
is_guest_os_customization="false",
tagkey_name_value_dictionaries=tag_dictionaries,
)
def test_exchange_nic_vlans(self):
success = self.driver.ex_exchange_nic_vlans(
nic_id_1="a4b4b42b-ccb5-416f-b052-ce7cb7fdff12",
nic_id_2="b39d09b8-ea65-424a-8fa6-c6f5a98afc69",
)
self.assertTrue(success)
def test_change_nic_network_adapter(self):
success = self.driver.ex_change_nic_network_adapter(
nic_id="0c55c269-20a5-4fec-8054-22a245a48fe4", network_adapter_name="E1000"
)
self.assertTrue(success)
def test_ex_create_node_uncustomized_mcp2_using_vlan(self):
# Get VLAN
vlan = self.driver.ex_get_vlan("0e56433f-d808-4669-821d-812769517ff8")
# Create node using vlan instead of private IPv4
node = self.driver.ex_create_node_uncustomized(
name="test_server_05",
image="fake_customer_image",
ex_network_domain="fakenetworkdomain",
ex_is_started=False,
ex_description=None,
ex_cluster_id=None,
ex_cpu_specification=None,
ex_memory_gb=None,
ex_primary_nic_private_ipv4=None,
ex_primary_nic_vlan=vlan,
ex_primary_nic_network_adapter=None,
ex_additional_nics=None,
ex_disks=None,
ex_tagid_value_pairs=None,
ex_tagname_value_pairs=None,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
def test_ex_create_node_uncustomized_mcp2_using_ipv4(self):
node = self.driver.ex_create_node_uncustomized(
name="test_server_05",
image="fake_customer_image",
ex_network_domain="fakenetworkdomain",
ex_is_started=False,
ex_description=None,
ex_cluster_id=None,
ex_cpu_specification=None,
ex_memory_gb=None,
ex_primary_nic_private_ipv4="10.0.0.1",
ex_primary_nic_vlan=None,
ex_primary_nic_network_adapter=None,
ex_additional_nics=None,
ex_disks=None,
ex_tagid_value_pairs=None,
ex_tagname_value_pairs=None,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
class InvalidRequestError(Exception):
def __init__(self, tag):
super(InvalidRequestError, self).__init__("Invalid Request - %s" % tag)
class DimensionDataMockHttp(MockHttp):
fixtures = ComputeFileFixtures("dimensiondata")
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_report_usage(
self, method, url, body, headers
):
body = self.fixtures.load("summary_usage_report.csv")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_report_usageDetailed(
self, method, url, body, headers
):
body = self.fixtures.load("detailed_usage_report.csv")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_auditlog(
self, method, url, body, headers
):
body = self.fixtures.load("audit_log.csv")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_myaccount_UNAUTHORIZED(self, method, url, body, headers):
return (httplib.UNAUTHORIZED, "", {}, httplib.responses[httplib.UNAUTHORIZED])
def _oec_0_9_myaccount(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_myaccount.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_myaccount_INPROGRESS(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_myaccount.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_myaccount_PAGINATED(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_myaccount.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_myaccount_ALLFILTERS(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_myaccount.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_base_image(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_base_image.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_base_imageWithDiskSpeed(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_base_imageWithDiskSpeed.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_deployed(
self, method, url, body, headers
):
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_deployed.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_pendingDeploy(
self, method, url, body, headers
):
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_pendingDeploy.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_datacenter(
self, method, url, body, headers
):
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_datacenter.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11(
self, method, url, body, headers
):
body = None
action = url.split("?")[-1]
if action == "restart":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11_restart.xml"
)
elif action == "shutdown":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11_shutdown.xml"
)
elif action == "delete":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11_delete.xml"
)
elif action == "start":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11_start.xml"
)
elif action == "poweroff":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11_poweroff.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11_INPROGRESS(
self, method, url, body, headers
):
body = None
action = url.split("?")[-1]
if action == "restart":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11_restart_INPROGRESS.xml"
)
elif action == "shutdown":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11_shutdown_INPROGRESS.xml"
)
elif action == "delete":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11_delete_INPROGRESS.xml"
)
elif action == "start":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11_start_INPROGRESS.xml"
)
elif action == "poweroff":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11_poweroff_INPROGRESS.xml"
)
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server(
self, method, url, body, headers
):
body = self.fixtures.load(
"_oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_networkWithLocation(
self, method, url, body, headers
):
if method == "POST":
request = ET.fromstring(body)
if (
request.tag
!= "{http://oec.api.opsource.net/schemas/network}NewNetworkWithLocation"
):
raise InvalidRequestError(request.tag)
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_networkWithLocation.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_networkWithLocation_NA9(
self, method, url, body, headers
):
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_networkWithLocation.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_4bba37be_506f_11e3_b29c_001517c4643e(
self, method, url, body, headers
):
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_4bba37be_506f_11e3_b29c_001517c4643e.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_e75ead52_692f_4314_8725_c8a4f4d13a87_disk_1_changeSize(
self, method, url, body, headers
):
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_e75ead52_692f_4314_8725_c8a4f4d13a87_disk_1_changeSize.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_e75ead52_692f_4314_8725_c8a4f4d13a87_disk_1_changeSpeed(
self, method, url, body, headers
):
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_e75ead52_692f_4314_8725_c8a4f4d13a87_disk_1_changeSpeed.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_e75ead52_692f_4314_8725_c8a4f4d13a87_disk_1(
self, method, url, body, headers
):
action = url.split("?")[-1]
if action == "delete":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_e75ead52_692f_4314_8725_c8a4f4d13a87_disk_1.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_e75ead52_692f_4314_8725_c8a4f4d13a87(
self, method, url, body, headers
):
if method == "GET":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_e75ead52_692f_4314_8725_c8a4f4d13a87.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
if method == "POST":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_e75ead52_692f_4314_8725_c8a4f4d13a87_POST.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_antiAffinityRule(
self, method, url, body, headers
):
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_antiAffinityRule_create.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_antiAffinityRule_FAIL_EXISTING(
self, method, url, body, headers
):
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_antiAffinityRule_create_FAIL.xml"
)
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_antiAffinityRule_07e3621a_a920_4a9a_943c_d8021f27f418(
self, method, url, body, headers
):
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_antiAffinityRule_delete.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_antiAffinityRule_07e3621a_a920_4a9a_943c_d8021f27f418_FAIL(
self, method, url, body, headers
):
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_antiAffinityRule_delete_FAIL.xml"
)
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server(
self, method, url, body, headers
):
body = self.fixtures.load("server.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_deleteServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deleteServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_deleteServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_deleteServer_INPROGRESS(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deleteServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_deleteServer_RESOURCEBUSY.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_rebootServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}rebootServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_rebootServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_rebootServer_INPROGRESS(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}rebootServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_rebootServer_RESOURCEBUSY.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server(
self, method, url, body, headers
):
if url.endswith("datacenterId=NA3"):
body = self.fixtures.load("2.4/server_server_NA3.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
body = self.fixtures.load("2.4/server_server.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_PAGESIZE50(
self, method, url, body, headers
):
if not url.endswith("pageSize=50"):
raise ValueError("pageSize is not set as expected")
body = self.fixtures.load("2.4/server_server.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_EMPTY(
self, method, url, body, headers
):
body = self.fixtures.load("server_server_paginated_empty.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_PAGED_THEN_EMPTY(
self, method, url, body, headers
):
if "pageNumber=2" in url:
body = self.fixtures.load("server_server_paginated_empty.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
else:
body = self.fixtures.load("2.4/server_server_paginated.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_PAGINATED(
self, method, url, body, headers
):
if "pageNumber=2" in url:
body = self.fixtures.load("2.4/server_server.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
else:
body = self.fixtures.load("2.4/server_server_paginated.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_PAGINATEDEMPTY(
self, method, url, body, headers
):
body = self.fixtures.load("server_server_paginated_empty.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_ALLFILTERS(
self, method, url, body, headers
):
(_, params) = url.split("?")
parameters = params.split("&")
for parameter in parameters:
(key, value) = parameter.split("=")
if key == "datacenterId":
assert value == "fake_loc"
elif key == "networkId":
assert value == "fake_network"
elif key == "networkDomainId":
assert value == "fake_network_domain"
elif key == "vlanId":
assert value == "fake_vlan"
elif key == "ipv6":
assert value == "fake_ipv6"
elif key == "privateIpv4":
assert value == "fake_ipv4"
elif key == "name":
assert value == "fake_name"
elif key == "state":
assert value == "fake_state"
elif key == "started":
assert value == "True"
elif key == "deployed":
assert value == "True"
elif key == "sourceImageId":
assert value == "fake_image"
else:
raise ValueError(
"Could not find in url parameters {0}:{1}".format(key, value)
)
body = self.fixtures.load("2.4/server_server.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_antiAffinityRule(
self, method, url, body, headers
):
body = self.fixtures.load("server_antiAffinityRule_list.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_antiAffinityRule_ALLFILTERS(
self, method, url, body, headers
):
(_, params) = url.split("?")
parameters = params.split("&")
for parameter in parameters:
(key, value) = parameter.split("=")
if key == "id":
assert value == "FAKE_ID"
elif key == "state":
assert value == "FAKE_STATE"
elif key == "pageSize":
assert value == "250"
elif key == "networkDomainId":
pass
else:
raise ValueError(
"Could not find in url parameters {0}:{1}".format(key, value)
)
body = self.fixtures.load("server_antiAffinityRule_list.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_antiAffinityRule_PAGINATED(
self, method, url, body, headers
):
if "pageNumber=2" in url:
body = self.fixtures.load("server_antiAffinityRule_list.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
else:
body = self.fixtures.load("server_antiAffinityRule_list_PAGINATED.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_infrastructure_datacenter(
self, method, url, body, headers
):
if url.endswith("id=NA9"):
body = self.fixtures.load("infrastructure_datacenter_NA9.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
body = self.fixtures.load("infrastructure_datacenter.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_infrastructure_datacenter_ALLFILTERS(
self, method, url, body, headers
):
if url.endswith("id=NA9"):
body = self.fixtures.load("infrastructure_datacenter_NA9.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
body = self.fixtures.load("infrastructure_datacenter.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_updateVmwareTools(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}updateVmwareTools":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_updateVmwareTools.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_startServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}startServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_startServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_startServer_INPROGRESS(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}startServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_startServer_INPROGRESS.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_shutdownServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}shutdownServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_shutdownServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_shutdownServer_INPROGRESS(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}shutdownServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_shutdownServer_INPROGRESS.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_resetServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}resetServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_resetServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_powerOffServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}powerOffServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_powerOffServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_powerOffServer_INPROGRESS(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}powerOffServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_powerOffServer_INPROGRESS.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_11_INPROGRESS(
self, method, url, body, headers
):
body = self.fixtures.load("2.4/server_GetServer.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_networkDomain(
self, method, url, body, headers
):
body = self.fixtures.load("network_networkDomain.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_networkDomain_ALLFILTERS(
self, method, url, body, headers
):
(_, params) = url.split("?")
parameters = params.split("&")
for parameter in parameters:
(key, value) = parameter.split("=")
if key == "datacenterId":
assert value == "fake_location"
elif key == "type":
assert value == "fake_plan"
elif key == "name":
assert value == "fake_name"
elif key == "state":
assert value == "fake_state"
else:
raise ValueError(
"Could not find in url parameters {0}:{1}".format(key, value)
)
body = self.fixtures.load("network_networkDomain.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_vlan(
self, method, url, body, headers
):
body = self.fixtures.load("network_vlan.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_vlan_ALLFILTERS(
self, method, url, body, headers
):
(_, params) = url.split("?")
parameters = params.split("&")
for parameter in parameters:
(key, value) = parameter.split("=")
if key == "datacenterId":
assert value == "fake_location"
elif key == "networkDomainId":
assert value == "fake_network_domain"
elif key == "ipv6Address":
assert value == "fake_ipv6"
elif key == "privateIpv4Address":
assert value == "fake_ipv4"
elif key == "name":
assert value == "fake_name"
elif key == "state":
assert value == "fake_state"
else:
raise ValueError(
"Could not find in url parameters {0}:{1}".format(key, value)
)
body = self.fixtures.load("network_vlan.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_deployServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deployServer":
raise InvalidRequestError(request.tag)
# Make sure the we either have a network tag with an IP or networkId
# Or Network info with a primary nic that has privateip or vlanid
network = request.find(fixxpath("network", TYPES_URN))
network_info = request.find(fixxpath("networkInfo", TYPES_URN))
if network is not None:
if network_info is not None:
raise InvalidRequestError("Request has both MCP1 and MCP2 values")
ipv4 = findtext(network, "privateIpv4", TYPES_URN)
networkId = findtext(network, "networkId", TYPES_URN)
if ipv4 is None and networkId is None:
raise InvalidRequestError(
"Invalid request MCP1 requests need privateIpv4 or networkId"
)
elif network_info is not None:
if network is not None:
raise InvalidRequestError("Request has both MCP1 and MCP2 values")
primary_nic = network_info.find(fixxpath("primaryNic", TYPES_URN))
ipv4 = findtext(primary_nic, "privateIpv4", TYPES_URN)
vlanId = findtext(primary_nic, "vlanId", TYPES_URN)
if ipv4 is None and vlanId is None:
raise InvalidRequestError(
"Invalid request MCP2 requests need privateIpv4 or vlanId"
)
else:
raise InvalidRequestError(
"Invalid request, does not have network or network_info in XML"
)
body = self.fixtures.load("server_deployServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_e75ead52_692f_4314_8725_c8a4f4d13a87(
self, method, url, body, headers
):
body = self.fixtures.load(
"2.4/server_server_e75ead52_692f_4314_8725_c8a4f4d13a87.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deployNetworkDomain(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deployNetworkDomain":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_deployNetworkDomain.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_networkDomain_8cdfd607_f429_4df6_9352_162cfc0891be(
self, method, url, body, headers
):
body = self.fixtures.load(
"network_networkDomain_8cdfd607_f429_4df6_9352_162cfc0891be.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_networkDomain_8cdfd607_f429_4df6_9352_162cfc0891be_ALLFILTERS(
self, method, url, body, headers
):
body = self.fixtures.load(
"network_networkDomain_8cdfd607_f429_4df6_9352_162cfc0891be.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_editNetworkDomain(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}editNetworkDomain":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_editNetworkDomain.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deleteNetworkDomain(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deleteNetworkDomain":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_deleteNetworkDomain.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deployVlan(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deployVlan":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_deployVlan.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_vlan_0e56433f_d808_4669_821d_812769517ff8(
self, method, url, body, headers
):
body = self.fixtures.load(
"network_vlan_0e56433f_d808_4669_821d_812769517ff8.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_editVlan(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}editVlan":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_editVlan.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deleteVlan(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deleteVlan":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_deleteVlan.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_expandVlan(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}expandVlan":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_expandVlan.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_addPublicIpBlock(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}addPublicIpBlock":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_addPublicIpBlock.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_publicIpBlock_4487241a_f0ca_11e3_9315_d4bed9b167ba(
self, method, url, body, headers
):
body = self.fixtures.load(
"network_publicIpBlock_4487241a_f0ca_11e3_9315_d4bed9b167ba.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_publicIpBlock(
self, method, url, body, headers
):
body = self.fixtures.load("network_publicIpBlock.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_publicIpBlock_9945dc4a_bdce_11e4_8c14_b8ca3a5d9ef8(
self, method, url, body, headers
):
body = self.fixtures.load(
"network_publicIpBlock_9945dc4a_bdce_11e4_8c14_b8ca3a5d9ef8.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_removePublicIpBlock(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}removePublicIpBlock":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_removePublicIpBlock.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_firewallRule(
self, method, url, body, headers
):
body = self.fixtures.load("network_firewallRule.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_createFirewallRule(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}createFirewallRule":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_createFirewallRule.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_firewallRule_d0a20f59_77b9_4f28_a63b_e58496b73a6c(
self, method, url, body, headers
):
body = self.fixtures.load(
"network_firewallRule_d0a20f59_77b9_4f28_a63b_e58496b73a6c.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_editFirewallRule(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}editFirewallRule":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_editFirewallRule.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deleteFirewallRule(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deleteFirewallRule":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_deleteFirewallRule.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_createNatRule(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}createNatRule":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_createNatRule.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_natRule(
self, method, url, body, headers
):
body = self.fixtures.load("network_natRule.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_natRule_2187a636_7ebb_49a1_a2ff_5d617f496dce(
self, method, url, body, headers
):
body = self.fixtures.load(
"network_natRule_2187a636_7ebb_49a1_a2ff_5d617f496dce.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deleteNatRule(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deleteNatRule":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_deleteNatRule.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_addNic(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}addNic":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_addNic.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_removeNic(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}removeNic":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_removeNic.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_disableServerMonitoring(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}disableServerMonitoring":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_disableServerMonitoring.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_enableServerMonitoring(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}enableServerMonitoring":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_enableServerMonitoring.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_changeServerMonitoringPlan(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}changeServerMonitoringPlan":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_changeServerMonitoringPlan.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_osImage(
self, method, url, body, headers
):
body = self.fixtures.load("2.4/image_osImage.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_osImage_c14b1a46_2428_44c1_9c1a_b20e6418d08c(
self, method, url, body, headers
):
body = self.fixtures.load(
"2.4/image_osImage_c14b1a46_2428_44c1_9c1a_b20e6418d08c.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_osImage_6b4fb0c7_a57b_4f58_b59c_9958f94f971a(
self, method, url, body, headers
):
body = self.fixtures.load(
"2.4/image_osImage_6b4fb0c7_a57b_4f58_b59c_9958f94f971a.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_osImage_5234e5c7_01de_4411_8b6e_baeb8d91cf5d(
self, method, url, body, headers
):
body = self.fixtures.load("image_osImage_BAD_REQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_osImage_2ffa36c8_1848_49eb_b4fa_9d908775f68c(
self, method, url, body, headers
):
body = self.fixtures.load("image_osImage_BAD_REQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_osImage_FAKE_IMAGE_ID(
self, method, url, body, headers
):
body = self.fixtures.load("image_osImage_BAD_REQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_customerImage(
self, method, url, body, headers
):
body = self.fixtures.load("2.4/image_customerImage.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_customerImage_5234e5c7_01de_4411_8b6e_baeb8d91cf5d(
self, method, url, body, headers
):
body = self.fixtures.load(
"2.4/image_customerImage_5234e5c7_01de_4411_8b6e_baeb8d91cf5d.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_customerImage_2ffa36c8_1848_49eb_b4fa_9d908775f68c(
self, method, url, body, headers
):
body = self.fixtures.load(
"2.4/image_customerImage_2ffa36c8_1848_49eb_b4fa_9d908775f68c.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_customerImage_FAKE_IMAGE_ID(
self, method, url, body, headers
):
body = self.fixtures.load("image_customerImage_BAD_REQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_reconfigureServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}reconfigureServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_reconfigureServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_cleanServer(
self, method, url, body, headers
):
body = self.fixtures.load("server_cleanServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_addDisk(
self, method, url, body, headers
):
body = self.fixtures.load("server_addDisk.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_removeDisk(
self, method, url, body, headers
):
body = self.fixtures.load("server_removeDisk.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_createTagKey(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}createTagKey":
raise InvalidRequestError(request.tag)
name = findtext(request, "name", TYPES_URN)
description = findtext(request, "description", TYPES_URN)
value_required = findtext(request, "valueRequired", TYPES_URN)
display_on_report = findtext(request, "displayOnReport", TYPES_URN)
if name is None:
raise ValueError("Name must have a value in the request")
if description is not None:
raise ValueError("Default description for a tag should be blank")
if value_required is None or value_required != "true":
raise ValueError("Default valueRequired should be true")
if display_on_report is None or display_on_report != "true":
raise ValueError("Default displayOnReport should be true")
body = self.fixtures.load("tag_createTagKey.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_createTagKey_ALLPARAMS(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}createTagKey":
raise InvalidRequestError(request.tag)
name = findtext(request, "name", TYPES_URN)
description = findtext(request, "description", TYPES_URN)
value_required = findtext(request, "valueRequired", TYPES_URN)
display_on_report = findtext(request, "displayOnReport", TYPES_URN)
if name is None:
raise ValueError("Name must have a value in the request")
if description is None:
raise ValueError("Description should have a value")
if value_required is None or value_required != "false":
raise ValueError("valueRequired should be false")
if display_on_report is None or display_on_report != "false":
raise ValueError("displayOnReport should be false")
body = self.fixtures.load("tag_createTagKey.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_createTagKey_BADREQUEST(
self, method, url, body, headers
):
body = self.fixtures.load("tag_createTagKey_BADREQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_tagKey(
self, method, url, body, headers
):
body = self.fixtures.load("tag_tagKey_list.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_tagKey_SINGLE(
self, method, url, body, headers
):
body = self.fixtures.load("tag_tagKey_list_SINGLE.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_tagKey_ALLFILTERS(
self, method, url, body, headers
):
(_, params) = url.split("?")
parameters = params.split("&")
for parameter in parameters:
(key, value) = parameter.split("=")
if key == "id":
assert value == "fake_id"
elif key == "name":
assert value == "fake_name"
elif key == "valueRequired":
assert value == "false"
elif key == "displayOnReport":
assert value == "false"
elif key == "pageSize":
assert value == "250"
else:
raise ValueError(
"Could not find in url parameters {0}:{1}".format(key, value)
)
body = self.fixtures.load("tag_tagKey_list.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_tagKey_d047c609_93d7_4bc5_8fc9_732c85840075(
self, method, url, body, headers
):
body = self.fixtures.load("tag_tagKey_5ab77f5f_5aa9_426f_8459_4eab34e03d54.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_tagKey_d047c609_93d7_4bc5_8fc9_732c85840075_NOEXIST(
self, method, url, body, headers
):
body = self.fixtures.load(
"tag_tagKey_5ab77f5f_5aa9_426f_8459_4eab34e03d54_BADREQUEST.xml"
)
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_editTagKey_NAME(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}editTagKey":
raise InvalidRequestError(request.tag)
name = findtext(request, "name", TYPES_URN)
description = findtext(request, "description", TYPES_URN)
value_required = findtext(request, "valueRequired", TYPES_URN)
display_on_report = findtext(request, "displayOnReport", TYPES_URN)
if name is None:
raise ValueError("Name must have a value in the request")
if description is not None:
raise ValueError("Description should be empty")
if value_required is not None:
raise ValueError("valueRequired should be empty")
if display_on_report is not None:
raise ValueError("displayOnReport should be empty")
body = self.fixtures.load("tag_editTagKey.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_editTagKey_NOTNAME(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}editTagKey":
raise InvalidRequestError(request.tag)
name = findtext(request, "name", TYPES_URN)
description = findtext(request, "description", TYPES_URN)
value_required = findtext(request, "valueRequired", TYPES_URN)
display_on_report = findtext(request, "displayOnReport", TYPES_URN)
if name is not None:
raise ValueError("Name should be empty")
if description is None:
raise ValueError("Description should not be empty")
if value_required is None:
raise ValueError("valueRequired should not be empty")
if display_on_report is None:
raise ValueError("displayOnReport should not be empty")
body = self.fixtures.load("tag_editTagKey.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_editTagKey_NOCHANGE(
self, method, url, body, headers
):
body = self.fixtures.load("tag_editTagKey_BADREQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_deleteTagKey(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deleteTagKey":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("tag_deleteTagKey.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_deleteTagKey_NOEXIST(
self, method, url, body, headers
):
body = self.fixtures.load("tag_deleteTagKey_BADREQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_applyTags(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}applyTags":
raise InvalidRequestError(request.tag)
asset_type = findtext(request, "assetType", TYPES_URN)
asset_id = findtext(request, "assetId", TYPES_URN)
tag = request.find(fixxpath("tag", TYPES_URN))
tag_key_name = findtext(tag, "tagKeyName", TYPES_URN)
value = findtext(tag, "value", TYPES_URN)
if asset_type is None:
raise ValueError("assetType should not be empty")
if asset_id is None:
raise ValueError("assetId should not be empty")
if tag_key_name is None:
raise ValueError("tagKeyName should not be empty")
if value is None:
raise ValueError("value should not be empty")
body = self.fixtures.load("tag_applyTags.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_applyTags_NOVALUE(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}applyTags":
raise InvalidRequestError(request.tag)
asset_type = findtext(request, "assetType", TYPES_URN)
asset_id = findtext(request, "assetId", TYPES_URN)
tag = request.find(fixxpath("tag", TYPES_URN))
tag_key_name = findtext(tag, "tagKeyName", TYPES_URN)
value = findtext(tag, "value", TYPES_URN)
if asset_type is None:
raise ValueError("assetType should not be empty")
if asset_id is None:
raise ValueError("assetId should not be empty")
if tag_key_name is None:
raise ValueError("tagKeyName should not be empty")
if value is not None:
raise ValueError("value should be empty")
body = self.fixtures.load("tag_applyTags.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_applyTags_NOTAGKEY(
self, method, url, body, headers
):
body = self.fixtures.load("tag_applyTags_BADREQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_removeTags(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}removeTags":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("tag_removeTag.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_removeTags_NOTAG(
self, method, url, body, headers
):
body = self.fixtures.load("tag_removeTag_BADREQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_tag(
self, method, url, body, headers
):
body = self.fixtures.load("tag_tag_list.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_tag_ALLPARAMS(
self, method, url, body, headers
):
(_, params) = url.split("?")
parameters = params.split("&")
for parameter in parameters:
(key, value) = parameter.split("=")
if key == "assetId":
assert value == "fake_asset_id"
elif key == "assetType":
assert value == "fake_asset_type"
elif key == "valueRequired":
assert value == "false"
elif key == "displayOnReport":
assert value == "false"
elif key == "pageSize":
assert value == "250"
elif key == "datacenterId":
assert value == "fake_location"
elif key == "value":
assert value == "fake_value"
elif key == "tagKeyName":
assert value == "fake_tag_key_name"
elif key == "tagKeyId":
assert value == "fake_tag_key_id"
else:
raise ValueError(
"Could not find in url parameters {0}:{1}".format(key, value)
)
body = self.fixtures.load("tag_tag_list.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_ipAddressList(
self, method, url, body, headers
):
body = self.fixtures.load("ip_address_lists.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_ipAddressList_FILTERBYNAME(
self, method, url, body, headers
):
body = self.fixtures.load("ip_address_lists_FILTERBYNAME.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_createIpAddressList(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}" "createIpAddressList":
raise InvalidRequestError(request.tag)
net_domain = findtext(request, "networkDomainId", TYPES_URN)
if net_domain is None:
raise ValueError("Network Domain should not be empty")
name = findtext(request, "name", TYPES_URN)
if name is None:
raise ValueError("Name should not be empty")
ip_version = findtext(request, "ipVersion", TYPES_URN)
if ip_version is None:
raise ValueError("IP Version should not be empty")
ip_address_col_required = findall(request, "ipAddress", TYPES_URN)
child_ip_address_required = findall(request, "childIpAddressListId", TYPES_URN)
if 0 == len(ip_address_col_required) and 0 == len(child_ip_address_required):
raise ValueError(
"At least one ipAddress element or "
"one childIpAddressListId element must be "
"provided."
)
if ip_address_col_required[0].get("begin") is None:
raise ValueError("IP Address should not be empty")
body = self.fixtures.load("ip_address_list_create.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_editIpAddressList(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}" "editIpAddressList":
raise InvalidRequestError(request.tag)
ip_address_list = request.get("id")
if ip_address_list is None:
raise ValueError("IpAddressList ID should not be empty")
name = findtext(request, "name", TYPES_URN)
if name is not None:
raise ValueError("Name should not exists in request")
ip_version = findtext(request, "ipVersion", TYPES_URN)
if ip_version is not None:
raise ValueError("IP Version should not exists in request")
ip_address_col_required = findall(request, "ipAddress", TYPES_URN)
child_ip_address_required = findall(request, "childIpAddressListId", TYPES_URN)
if 0 == len(ip_address_col_required) and 0 == len(child_ip_address_required):
raise ValueError(
"At least one ipAddress element or "
"one childIpAddressListId element must be "
"provided."
)
if ip_address_col_required[0].get("begin") is None:
raise ValueError("IP Address should not be empty")
body = self.fixtures.load("ip_address_list_edit.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deleteIpAddressList(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}" "deleteIpAddressList":
raise InvalidRequestError(request.tag)
ip_address_list = request.get("id")
if ip_address_list is None:
raise ValueError("IpAddressList ID should not be empty")
body = self.fixtures.load("ip_address_list_delete.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_portList(
self, method, url, body, headers
):
body = self.fixtures.load("port_list_lists.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_portList_c8c92ea3_2da8_4d51_8153_f39bec794d69(
self, method, url, body, headers
):
body = self.fixtures.load("port_list_get.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_createPortList(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}" "createPortList":
raise InvalidRequestError(request.tag)
net_domain = findtext(request, "networkDomainId", TYPES_URN)
if net_domain is None:
raise ValueError("Network Domain should not be empty")
ports_required = findall(request, "port", TYPES_URN)
child_port_list_required = findall(request, "childPortListId", TYPES_URN)
if 0 == len(ports_required) and 0 == len(child_port_list_required):
raise ValueError(
"At least one port element or one "
"childPortListId element must be provided"
)
if ports_required[0].get("begin") is None:
raise ValueError("PORT begin value should not be empty")
body = self.fixtures.load("port_list_create.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_editPortList(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}" "editPortList":
raise InvalidRequestError(request.tag)
ports_required = findall(request, "port", TYPES_URN)
child_port_list_required = findall(request, "childPortListId", TYPES_URN)
if 0 == len(ports_required) and 0 == len(child_port_list_required):
raise ValueError(
"At least one port element or one "
"childPortListId element must be provided"
)
if ports_required[0].get("begin") is None:
raise ValueError("PORT begin value should not be empty")
body = self.fixtures.load("port_list_edit.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deletePortList(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}" "deletePortList":
raise InvalidRequestError(request.tag)
port_list = request.get("id")
if port_list is None:
raise ValueError("Port List ID should not be empty")
body = self.fixtures.load("ip_address_list_delete.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_cloneServer(
self, method, url, body, headers
):
body = self.fixtures.load("2.4/server_clone_response.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_importImage(
self, method, url, body, headers
):
body = self.fixtures.load("2.4/import_image_response.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_exchangeNicVlans(
self, method, url, body, headers
):
body = self.fixtures.load("2.4/exchange_nic_vlans_response.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_changeNetworkAdapter(
self, method, url, body, headers
):
body = self.fixtures.load("2.4/change_nic_networkadapter_response.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_deployUncustomizedServer(
self, method, url, body, headers
):
body = self.fixtures.load("2.4/deploy_customised_server.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
if __name__ == "__main__":
sys.exit(unittest.main())