blob: f26cea53e5bc65dd2217541ef47693beda44fc9d [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from types import GeneratorType
from libcloud.test import MockHttp, unittest
from libcloud.utils.py3 import ET, httplib
from libcloud.utils.xml import findall, findtext, fixxpath
from libcloud.common.types import InvalidCredsError
from libcloud.compute.base import Node, NodeLocation, NodeAuthPassword
from libcloud.test.secrets import DIMENSIONDATA_PARAMS
from libcloud.test.file_fixtures import ComputeFileFixtures
from libcloud.common.dimensiondata import (
TYPES_URN,
DimensionDataTag,
DimensionDataPort,
DimensionDataTagKey,
DimensionDataPortList,
DimensionDataIpAddress,
DimensionDataServerDisk,
NetworkDomainServicePlan,
DimensionDataAPIException,
DimensionDataChildPortList,
DimensionDataIpAddressList,
DimensionDataServerVMWareTools,
DimensionDataChildIpAddressList,
DimensionDataServerCpuSpecification,
)
from libcloud.compute.drivers.dimensiondata import DimensionDataNic
from libcloud.compute.drivers.dimensiondata import DimensionDataNodeDriver as DimensionData
class DimensionData_v2_4_Tests(unittest.TestCase):
def setUp(self):
DimensionData.connectionCls.active_api_version = "2.4"
DimensionData.connectionCls.conn_class = DimensionDataMockHttp
DimensionDataMockHttp.type = None
self.driver = DimensionData(*DIMENSIONDATA_PARAMS)
def test_invalid_region(self):
with self.assertRaises(ValueError):
DimensionData(*DIMENSIONDATA_PARAMS, region="blah")
def test_invalid_creds(self):
DimensionDataMockHttp.type = "UNAUTHORIZED"
with self.assertRaises(InvalidCredsError):
self.driver.list_nodes()
def test_get_account_details(self):
DimensionDataMockHttp.type = None
ret = self.driver.connection.get_account_details()
self.assertEqual(ret.full_name, "Test User")
self.assertEqual(ret.first_name, "Test")
self.assertEqual(ret.email, "test@example.com")
def test_list_locations_response(self):
DimensionDataMockHttp.type = None
ret = self.driver.list_locations()
self.assertEqual(len(ret), 5)
first_loc = ret[0]
self.assertEqual(first_loc.id, "NA3")
self.assertEqual(first_loc.name, "US - West")
self.assertEqual(first_loc.country, "US")
def test_list_nodes_response(self):
DimensionDataMockHttp.type = None
ret = self.driver.list_nodes()
self.assertEqual(len(ret), 7)
def test_node_extras(self):
DimensionDataMockHttp.type = None
ret = self.driver.list_nodes()
self.assertTrue(isinstance(ret[0].extra["vmWareTools"], DimensionDataServerVMWareTools))
self.assertTrue(isinstance(ret[0].extra["cpu"], DimensionDataServerCpuSpecification))
self.assertTrue(isinstance(ret[0].extra["disks"], list))
self.assertTrue(isinstance(ret[0].extra["disks"][0], DimensionDataServerDisk))
self.assertEqual(ret[0].extra["disks"][0].size_gb, 10)
self.assertTrue(isinstance(ret[1].extra["disks"], list))
self.assertTrue(isinstance(ret[1].extra["disks"][0], DimensionDataServerDisk))
self.assertEqual(ret[1].extra["disks"][0].size_gb, 10)
def test_server_states(self):
DimensionDataMockHttp.type = None
ret = self.driver.list_nodes()
self.assertTrue(ret[0].state == "running")
self.assertTrue(ret[1].state == "starting")
self.assertTrue(ret[2].state == "stopping")
self.assertTrue(ret[3].state == "reconfiguring")
self.assertTrue(ret[4].state == "running")
self.assertTrue(ret[5].state == "terminated")
self.assertTrue(ret[6].state == "stopped")
self.assertEqual(len(ret), 7)
def test_list_nodes_response_PAGINATED(self):
DimensionDataMockHttp.type = "PAGINATED"
ret = self.driver.list_nodes()
self.assertEqual(len(ret), 9)
def test_paginated_mcp2_call_EMPTY(self):
# cache org
self.driver.connection._get_orgId()
DimensionDataMockHttp.type = "EMPTY"
node_list_generator = self.driver.connection.paginated_request_with_orgId_api_2(
"server/server"
)
empty_node_list = []
for node_list in node_list_generator:
empty_node_list.extend(node_list)
self.assertTrue(len(empty_node_list) == 0)
def test_paginated_mcp2_call_PAGED_THEN_EMPTY(self):
# cache org
self.driver.connection._get_orgId()
DimensionDataMockHttp.type = "PAGED_THEN_EMPTY"
node_list_generator = self.driver.connection.paginated_request_with_orgId_api_2(
"server/server"
)
final_node_list = []
for node_list in node_list_generator:
final_node_list.extend(node_list)
self.assertTrue(len(final_node_list) == 2)
def test_paginated_mcp2_call_with_page_size(self):
# cache org
self.driver.connection._get_orgId()
DimensionDataMockHttp.type = "PAGESIZE50"
node_list_generator = self.driver.connection.paginated_request_with_orgId_api_2(
"server/server", page_size=50
)
self.assertTrue(isinstance(node_list_generator, GeneratorType))
# We're making sure here the filters make it to the URL
# See _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_ALLFILTERS for asserts
def test_list_nodes_response_strings_ALLFILTERS(self):
DimensionDataMockHttp.type = "ALLFILTERS"
ret = self.driver.list_nodes(
ex_location="fake_loc",
ex_name="fake_name",
ex_ipv6="fake_ipv6",
ex_ipv4="fake_ipv4",
ex_vlan="fake_vlan",
ex_image="fake_image",
ex_deployed=True,
ex_started=True,
ex_state="fake_state",
ex_network="fake_network",
ex_network_domain="fake_network_domain",
)
self.assertTrue(isinstance(ret, list))
self.assertEqual(len(ret), 7)
node = ret[3]
self.assertTrue(isinstance(node.extra["disks"], list))
self.assertTrue(isinstance(node.extra["disks"][0], DimensionDataServerDisk))
self.assertEqual(node.size.id, "1")
self.assertEqual(node.image.id, "3ebf3c0f-90fe-4a8b-8585-6e65b316592c")
self.assertEqual(node.image.name, "WIN2008S/32")
disk = node.extra["disks"][0]
self.assertEqual(disk.id, "c2e1f199-116e-4dbc-9960-68720b832b0a")
self.assertEqual(disk.scsi_id, 0)
self.assertEqual(disk.size_gb, 50)
self.assertEqual(disk.speed, "STANDARD")
self.assertEqual(disk.state, "NORMAL")
def test_list_nodes_response_LOCATION(self):
DimensionDataMockHttp.type = None
ret = self.driver.list_locations()
first_loc = ret[0]
ret = self.driver.list_nodes(ex_location=first_loc)
for node in ret:
self.assertEqual(node.extra["datacenterId"], "NA3")
def test_list_nodes_response_LOCATION_STR(self):
DimensionDataMockHttp.type = None
ret = self.driver.list_nodes(ex_location="NA3")
for node in ret:
self.assertEqual(node.extra["datacenterId"], "NA3")
def test_list_sizes_response(self):
DimensionDataMockHttp.type = None
ret = self.driver.list_sizes()
self.assertEqual(len(ret), 1)
size = ret[0]
self.assertEqual(size.name, "default")
def test_reboot_node_response(self):
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
ret = node.reboot()
self.assertTrue(ret is True)
def test_reboot_node_response_INPROGRESS(self):
DimensionDataMockHttp.type = "INPROGRESS"
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
with self.assertRaises(DimensionDataAPIException):
node.reboot()
def test_destroy_node_response(self):
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
ret = node.destroy()
self.assertTrue(ret is True)
def test_destroy_node_response_RESOURCE_BUSY(self):
DimensionDataMockHttp.type = "INPROGRESS"
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
with self.assertRaises(DimensionDataAPIException):
node.destroy()
def test_list_images(self):
images = self.driver.list_images()
self.assertEqual(len(images), 3)
self.assertEqual(images[0].name, "RedHat 6 64-bit 2 CPU")
self.assertEqual(images[0].id, "c14b1a46-2428-44c1-9c1a-b20e6418d08c")
self.assertEqual(images[0].extra["location"].id, "NA9")
self.assertEqual(images[0].extra["cpu"].cpu_count, 2)
self.assertEqual(images[0].extra["OS_displayName"], "REDHAT6/64")
def test_clean_failed_deployment_response_with_node(self):
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
ret = self.driver.ex_clean_failed_deployment(node)
self.assertTrue(ret is True)
def test_clean_failed_deployment_response_with_node_id(self):
node = "e75ead52-692f-4314-8725-c8a4f4d13a87"
ret = self.driver.ex_clean_failed_deployment(node)
self.assertTrue(ret is True)
def test_ex_list_customer_images(self):
images = self.driver.ex_list_customer_images()
self.assertEqual(len(images), 3)
self.assertEqual(images[0].name, "ImportedCustomerImage")
self.assertEqual(images[0].id, "5234e5c7-01de-4411-8b6e-baeb8d91cf5d")
self.assertEqual(images[0].extra["location"].id, "NA9")
self.assertEqual(images[0].extra["cpu"].cpu_count, 4)
self.assertEqual(images[0].extra["OS_displayName"], "REDHAT6/64")
def test_create_mcp1_node_optional_param(self):
root_pw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
network = self.driver.ex_list_networks()[0]
cpu_spec = DimensionDataServerCpuSpecification(
cpu_count="4", cores_per_socket="2", performance="STANDARD"
)
disks = [DimensionDataServerDisk(scsi_id="0", speed="HIGHPERFORMANCE")]
node = self.driver.create_node(
name="test2",
image=image,
auth=root_pw,
ex_description="test2 node",
ex_network=network,
ex_is_started=False,
ex_memory_gb=8,
ex_disks=disks,
ex_cpu_specification=cpu_spec,
ex_primary_dns="10.0.0.5",
ex_secondary_dns="10.0.0.6",
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_mcp1_node_response_no_pass_random_gen(self):
image = self.driver.list_images()[0]
network = self.driver.ex_list_networks()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=None,
ex_description="test2 node",
ex_network=network,
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
self.assertTrue("password" in node.extra)
def test_create_mcp1_node_response_no_pass_customer_windows(self):
image = self.driver.ex_list_customer_images()[1]
network = self.driver.ex_list_networks()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=None,
ex_description="test2 node",
ex_network=network,
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
self.assertTrue("password" in node.extra)
def test_create_mcp1_node_response_no_pass_customer_windows_STR(self):
image = self.driver.ex_list_customer_images()[1].id
network = self.driver.ex_list_networks()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=None,
ex_description="test2 node",
ex_network=network,
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
self.assertTrue("password" in node.extra)
def test_create_mcp1_node_response_no_pass_customer_linux(self):
image = self.driver.ex_list_customer_images()[0]
network = self.driver.ex_list_networks()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=None,
ex_description="test2 node",
ex_network=network,
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
self.assertTrue("password" not in node.extra)
def test_create_mcp1_node_response_no_pass_customer_linux_STR(self):
image = self.driver.ex_list_customer_images()[0].id
network = self.driver.ex_list_networks()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=None,
ex_description="test2 node",
ex_network=network,
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
self.assertTrue("password" not in node.extra)
def test_create_mcp1_node_response_STR(self):
rootPw = "pass123"
image = self.driver.list_images()[0].id
network = self.driver.ex_list_networks()[0].id
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network=network,
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_response_network_domain(self):
rootPw = NodeAuthPassword("pass123")
location = self.driver.ex_get_location_by_id("NA9")
image = self.driver.list_images(location=location)[0]
network_domain = self.driver.ex_list_network_domains(location=location)[0]
vlan = self.driver.ex_list_vlans(location=location)[0]
cpu = DimensionDataServerCpuSpecification(
cpu_count=4, cores_per_socket=1, performance="HIGHPERFORMANCE"
)
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain=network_domain,
ex_vlan=vlan,
ex_is_started=False,
ex_cpu_specification=cpu,
ex_memory_gb=4,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_response_network_domain_STR(self):
rootPw = NodeAuthPassword("pass123")
location = self.driver.ex_get_location_by_id("NA9")
image = self.driver.list_images(location=location)[0]
network_domain = self.driver.ex_list_network_domains(location=location)[0].id
vlan = self.driver.ex_list_vlans(location=location)[0].id
cpu = DimensionDataServerCpuSpecification(
cpu_count=4, cores_per_socket=1, performance="HIGHPERFORMANCE"
)
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain=network_domain,
ex_vlan=vlan,
ex_is_started=False,
ex_cpu_specification=cpu,
ex_memory_gb=4,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_mcp1_node_no_network(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
with self.assertRaises(InvalidRequestError):
self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network=None,
ex_is_started=False,
)
def test_create_node_mcp1_ipv4(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network="fakenetwork",
ex_primary_ipv4="10.0.0.1",
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_mcp1_network(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network="fakenetwork",
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_mcp2_vlan(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_vlan="fakevlan",
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_mcp2_ipv4(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_ipv4="10.0.0.1",
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_network_domain_no_vlan_or_ipv4(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
with self.assertRaises(ValueError):
self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fake_network_domain",
ex_is_started=False,
)
def test_create_node_response(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
node = self.driver.create_node(
name="test3",
image=image,
auth=rootPw,
ex_network_domain="fakenetworkdomain",
ex_primary_nic_vlan="fakevlan",
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_ms_time_zone(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
node = self.driver.create_node(
name="test3",
image=image,
auth=rootPw,
ex_network_domain="fakenetworkdomain",
ex_primary_nic_vlan="fakevlan",
ex_microsoft_time_zone="040",
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_ambigious_mcps_fail(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
with self.assertRaises(ValueError):
self.driver.create_node(
name="test3",
image=image,
auth=rootPw,
ex_network_domain="fakenetworkdomain",
ex_network="fakenetwork",
ex_primary_nic_vlan="fakevlan",
)
def test_create_node_no_network_domain_fail(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
with self.assertRaises(ValueError):
self.driver.create_node(
name="test3", image=image, auth=rootPw, ex_primary_nic_vlan="fakevlan"
)
def test_create_node_no_primary_nic_fail(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
with self.assertRaises(ValueError):
self.driver.create_node(
name="test3",
image=image,
auth=rootPw,
ex_network_domain="fakenetworkdomain",
)
def test_create_node_primary_vlan_nic(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
node = self.driver.create_node(
name="test3",
image=image,
auth=rootPw,
ex_network_domain="fakenetworkdomain",
ex_primary_nic_vlan="fakevlan",
ex_primary_nic_network_adapter="v1000",
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_primary_ipv4(self):
rootPw = "pass123"
image = self.driver.list_images()[0]
node = self.driver.create_node(
name="test3",
image=image,
auth=rootPw,
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_both_primary_nic_and_vlan_fail(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
with self.assertRaises(ValueError):
self.driver.create_node(
name="test3",
image=image,
auth=rootPw,
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_primary_nic_vlan="fakevlan",
)
def test_create_node_cpu_specification(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
cpu_spec = DimensionDataServerCpuSpecification(
cpu_count="4", cores_per_socket="2", performance="STANDARD"
)
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_is_started=False,
ex_cpu_specification=cpu_spec,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_memory(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_is_started=False,
ex_memory_gb=8,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_disks(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
disks = [DimensionDataServerDisk(scsi_id="0", speed="HIGHPERFORMANCE")]
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_is_started=False,
ex_disks=disks,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_disks_fail(self):
root_pw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
disks = "blah"
with self.assertRaises(TypeError):
self.driver.create_node(
name="test2",
image=image,
auth=root_pw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_is_started=False,
ex_disks=disks,
)
def test_create_node_ipv4_gateway(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_is_started=False,
ex_ipv4_gateway="10.2.2.2",
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_network_domain_no_vlan_no_ipv4_fail(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
with self.assertRaises(ValueError):
self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fake_network_domain",
ex_is_started=False,
)
def test_create_node_mcp2_additional_nics_legacy(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
additional_vlans = ["fakevlan1", "fakevlan2"]
additional_ipv4 = ["10.0.0.2", "10.0.0.3"]
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_ipv4="10.0.0.1",
ex_additional_nics_vlan=additional_vlans,
ex_additional_nics_ipv4=additional_ipv4,
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_bad_additional_nics_ipv4(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
with self.assertRaises(TypeError):
self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fake_network_domain",
ex_vlan="fake_vlan",
ex_additional_nics_ipv4="badstring",
ex_is_started=False,
)
def test_create_node_additional_nics(self):
root_pw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
nic1 = DimensionDataNic(vlan="fake_vlan", network_adapter_name="v1000")
nic2 = DimensionDataNic(private_ip_v4="10.1.1.2", network_adapter_name="v1000")
additional_nics = [nic1, nic2]
node = self.driver.create_node(
name="test2",
image=image,
auth=root_pw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_additional_nics=additional_nics,
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_create_node_additional_nics_vlan_ipv4_coexist_fail(self):
root_pw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
nic1 = DimensionDataNic(
private_ip_v4="10.1.1.1", vlan="fake_vlan", network_adapter_name="v1000"
)
nic2 = DimensionDataNic(
private_ip_v4="10.1.1.2", vlan="fake_vlan2", network_adapter_name="v1000"
)
additional_nics = [nic1, nic2]
with self.assertRaises(ValueError):
self.driver.create_node(
name="test2",
image=image,
auth=root_pw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_additional_nics=additional_nics,
ex_is_started=False,
)
def test_create_node_additional_nics_invalid_input_fail(self):
root_pw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
additional_nics = "blah"
with self.assertRaises(TypeError):
self.driver.create_node(
name="test2",
image=image,
auth=root_pw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_additional_nics=additional_nics,
ex_is_started=False,
)
def test_create_node_additional_nics_vlan_ipv4_not_exist_fail(self):
root_pw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
nic1 = DimensionDataNic(network_adapter_name="v1000")
nic2 = DimensionDataNic(network_adapter_name="v1000")
additional_nics = [nic1, nic2]
with self.assertRaises(ValueError):
self.driver.create_node(
name="test2",
image=image,
auth=root_pw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_additional_nics=additional_nics,
ex_is_started=False,
)
def test_create_node_bad_additional_nics_vlan(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
with self.assertRaises(TypeError):
self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fake_network_domain",
ex_vlan="fake_vlan",
ex_additional_nics_vlan="badstring",
ex_is_started=False,
)
def test_create_node_mcp2_indicate_dns(self):
rootPw = NodeAuthPassword("pass123")
image = self.driver.list_images()[0]
node = self.driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test node dns",
ex_network_domain="fakenetworkdomain",
ex_primary_ipv4="10.0.0.1",
ex_primary_dns="8.8.8.8",
ex_secondary_dns="8.8.4.4",
ex_is_started=False,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
self.assertEqual(node.extra["status"].action, "DEPLOY_SERVER")
def test_ex_shutdown_graceful(self):
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
ret = self.driver.ex_shutdown_graceful(node)
self.assertTrue(ret is True)
def test_ex_shutdown_graceful_INPROGRESS(self):
DimensionDataMockHttp.type = "INPROGRESS"
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_shutdown_graceful(node)
def test_ex_start_node(self):
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
ret = self.driver.ex_start_node(node)
self.assertTrue(ret is True)
def test_ex_start_node_INPROGRESS(self):
DimensionDataMockHttp.type = "INPROGRESS"
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_start_node(node)
def test_ex_power_off(self):
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
ret = self.driver.ex_power_off(node)
self.assertTrue(ret is True)
def test_ex_update_vm_tools(self):
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
ret = self.driver.ex_update_vm_tools(node)
self.assertTrue(ret is True)
def test_ex_power_off_INPROGRESS(self):
DimensionDataMockHttp.type = "INPROGRESS"
node = Node(
id="11",
name=None,
state="STOPPING",
public_ips=None,
private_ips=None,
driver=self.driver,
)
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_power_off(node)
def test_ex_reset(self):
node = Node(
id="11",
name=None,
state=None,
public_ips=None,
private_ips=None,
driver=self.driver,
)
ret = self.driver.ex_reset(node)
self.assertTrue(ret is True)
def test_ex_attach_node_to_vlan(self):
node = self.driver.ex_get_node_by_id("e75ead52-692f-4314-8725-c8a4f4d13a87")
vlan = self.driver.ex_get_vlan("0e56433f-d808-4669-821d-812769517ff8")
ret = self.driver.ex_attach_node_to_vlan(node, vlan)
self.assertTrue(ret is True)
def test_ex_destroy_nic(self):
node = self.driver.ex_destroy_nic("a202e51b-41c0-4cfc-add0-b1c62fc0ecf6")
self.assertTrue(node)
def test_list_networks(self):
nets = self.driver.list_networks()
self.assertEqual(nets[0].name, "test-net1")
self.assertTrue(isinstance(nets[0].location, NodeLocation))
def test_ex_create_network(self):
location = self.driver.ex_get_location_by_id("NA9")
net = self.driver.ex_create_network(location, "Test Network", "test")
self.assertEqual(net.id, "208e3a8e-9d2f-11e2-b29c-001517c4643e")
self.assertEqual(net.name, "Test Network")
def test_ex_create_network_NO_DESCRIPTION(self):
location = self.driver.ex_get_location_by_id("NA9")
net = self.driver.ex_create_network(location, "Test Network")
self.assertEqual(net.id, "208e3a8e-9d2f-11e2-b29c-001517c4643e")
self.assertEqual(net.name, "Test Network")
def test_ex_delete_network(self):
net = self.driver.ex_list_networks()[0]
result = self.driver.ex_delete_network(net)
self.assertTrue(result)
def test_ex_rename_network(self):
net = self.driver.ex_list_networks()[0]
result = self.driver.ex_rename_network(net, "barry")
self.assertTrue(result)
def test_ex_create_network_domain(self):
location = self.driver.ex_get_location_by_id("NA9")
plan = NetworkDomainServicePlan.ADVANCED
net = self.driver.ex_create_network_domain(
location=location, name="test", description="test", service_plan=plan
)
self.assertEqual(net.name, "test")
self.assertTrue(net.id, "f14a871f-9a25-470c-aef8-51e13202e1aa")
def test_ex_create_network_domain_NO_DESCRIPTION(self):
location = self.driver.ex_get_location_by_id("NA9")
plan = NetworkDomainServicePlan.ADVANCED
net = self.driver.ex_create_network_domain(
location=location, name="test", service_plan=plan
)
self.assertEqual(net.name, "test")
self.assertTrue(net.id, "f14a871f-9a25-470c-aef8-51e13202e1aa")
def test_ex_get_network_domain(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
self.assertEqual(net.id, "8cdfd607-f429-4df6-9352-162cfc0891be")
self.assertEqual(net.description, "test2")
self.assertEqual(net.name, "test")
def test_ex_update_network_domain(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
net.name = "new name"
net2 = self.driver.ex_update_network_domain(net)
self.assertEqual(net2.name, "new name")
def test_ex_delete_network_domain(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
result = self.driver.ex_delete_network_domain(net)
self.assertTrue(result)
def test_ex_list_networks(self):
nets = self.driver.ex_list_networks()
self.assertEqual(nets[0].name, "test-net1")
self.assertTrue(isinstance(nets[0].location, NodeLocation))
def test_ex_list_network_domains(self):
nets = self.driver.ex_list_network_domains()
self.assertEqual(nets[0].name, "Aurora")
self.assertTrue(isinstance(nets[0].location, NodeLocation))
def test_ex_list_network_domains_ALLFILTERS(self):
DimensionDataMockHttp.type = "ALLFILTERS"
nets = self.driver.ex_list_network_domains(
location="fake_location",
name="fake_name",
service_plan="fake_plan",
state="fake_state",
)
self.assertEqual(nets[0].name, "Aurora")
self.assertTrue(isinstance(nets[0].location, NodeLocation))
def test_ex_list_vlans(self):
vlans = self.driver.ex_list_vlans()
self.assertEqual(vlans[0].name, "Primary")
def test_ex_list_vlans_ALLFILTERS(self):
DimensionDataMockHttp.type = "ALLFILTERS"
vlans = self.driver.ex_list_vlans(
location="fake_location",
network_domain="fake_network_domain",
name="fake_name",
ipv4_address="fake_ipv4",
ipv6_address="fake_ipv6",
state="fake_state",
)
self.assertEqual(vlans[0].name, "Primary")
def test_ex_create_vlan(
self,
):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
vlan = self.driver.ex_create_vlan(
network_domain=net,
name="test",
private_ipv4_base_address="10.3.4.0",
private_ipv4_prefix_size="24",
description="test vlan",
)
self.assertEqual(vlan.id, "0e56433f-d808-4669-821d-812769517ff8")
def test_ex_create_vlan_NO_DESCRIPTION(
self,
):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
vlan = self.driver.ex_create_vlan(
network_domain=net,
name="test",
private_ipv4_base_address="10.3.4.0",
private_ipv4_prefix_size="24",
)
self.assertEqual(vlan.id, "0e56433f-d808-4669-821d-812769517ff8")
def test_ex_get_vlan(self):
vlan = self.driver.ex_get_vlan("0e56433f-d808-4669-821d-812769517ff8")
self.assertEqual(vlan.id, "0e56433f-d808-4669-821d-812769517ff8")
self.assertEqual(vlan.description, "test2")
self.assertEqual(vlan.status, "NORMAL")
self.assertEqual(vlan.name, "Production VLAN")
self.assertEqual(vlan.private_ipv4_range_address, "10.0.3.0")
self.assertEqual(vlan.private_ipv4_range_size, 24)
self.assertEqual(vlan.ipv6_range_size, 64)
self.assertEqual(vlan.ipv6_range_address, "2607:f480:1111:1153:0:0:0:0")
self.assertEqual(vlan.ipv4_gateway, "10.0.3.1")
self.assertEqual(vlan.ipv6_gateway, "2607:f480:1111:1153:0:0:0:1")
def test_ex_wait_for_state(self):
self.driver.ex_wait_for_state(
"NORMAL",
self.driver.ex_get_vlan,
vlan_id="0e56433f-d808-4669-821d-812769517ff8",
poll_interval=0.1,
)
def test_ex_wait_for_state_NODE(self):
self.driver.ex_wait_for_state(
"running",
self.driver.ex_get_node_by_id,
id="e75ead52-692f-4314-8725-c8a4f4d13a87",
poll_interval=0.1,
)
def test_ex_wait_for_state_FAIL(self):
with self.assertRaises(DimensionDataAPIException) as context:
self.driver.ex_wait_for_state(
"starting",
self.driver.ex_get_node_by_id,
id="e75ead52-692f-4314-8725-c8a4f4d13a87",
poll_interval=0.1,
timeout=0.1,
)
self.assertEqual(context.exception.code, "running")
self.assertTrue("timed out" in context.exception.msg)
def test_ex_update_vlan(self):
vlan = self.driver.ex_get_vlan("0e56433f-d808-4669-821d-812769517ff8")
vlan.name = "new name"
vlan2 = self.driver.ex_update_vlan(vlan)
self.assertEqual(vlan2.name, "new name")
def test_ex_delete_vlan(self):
vlan = self.driver.ex_get_vlan("0e56433f-d808-4669-821d-812769517ff8")
result = self.driver.ex_delete_vlan(vlan)
self.assertTrue(result)
def test_ex_expand_vlan(self):
vlan = self.driver.ex_get_vlan("0e56433f-d808-4669-821d-812769517ff8")
vlan.private_ipv4_range_size = "23"
vlan = self.driver.ex_expand_vlan(vlan)
self.assertEqual(vlan.private_ipv4_range_size, "23")
def test_ex_add_public_ip_block_to_network_domain(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
block = self.driver.ex_add_public_ip_block_to_network_domain(net)
self.assertEqual(block.id, "9945dc4a-bdce-11e4-8c14-b8ca3a5d9ef8")
def test_ex_list_public_ip_blocks(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
blocks = self.driver.ex_list_public_ip_blocks(net)
self.assertEqual(blocks[0].base_ip, "168.128.4.18")
self.assertEqual(blocks[0].size, "2")
self.assertEqual(blocks[0].id, "9945dc4a-bdce-11e4-8c14-b8ca3a5d9ef8")
self.assertEqual(blocks[0].location.id, "NA9")
self.assertEqual(blocks[0].network_domain.id, net.id)
def test_ex_get_public_ip_block(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
block = self.driver.ex_get_public_ip_block("9945dc4a-bdce-11e4-8c14-b8ca3a5d9ef8")
self.assertEqual(block.base_ip, "168.128.4.18")
self.assertEqual(block.size, "2")
self.assertEqual(block.id, "9945dc4a-bdce-11e4-8c14-b8ca3a5d9ef8")
self.assertEqual(block.location.id, "NA9")
self.assertEqual(block.network_domain.id, net.id)
def test_ex_delete_public_ip_block(self):
block = self.driver.ex_get_public_ip_block("9945dc4a-bdce-11e4-8c14-b8ca3a5d9ef8")
result = self.driver.ex_delete_public_ip_block(block)
self.assertTrue(result)
def test_ex_list_firewall_rules(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = self.driver.ex_list_firewall_rules(net)
self.assertEqual(rules[0].id, "756cba02-b0bc-48f4-aea5-9445870b6148")
self.assertEqual(rules[0].network_domain.id, "8cdfd607-f429-4df6-9352-162cfc0891be")
self.assertEqual(rules[0].name, "CCDEFAULT.BlockOutboundMailIPv4")
self.assertEqual(rules[0].action, "DROP")
self.assertEqual(rules[0].ip_version, "IPV4")
self.assertEqual(rules[0].protocol, "TCP")
self.assertEqual(rules[0].source.ip_address, "ANY")
self.assertTrue(rules[0].source.any_ip)
self.assertTrue(rules[0].destination.any_ip)
def test_ex_create_firewall_rule(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = self.driver.ex_list_firewall_rules(net)
rule = self.driver.ex_create_firewall_rule(net, rules[0], "FIRST")
self.assertEqual(rule.id, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
def test_ex_create_firewall_rule_with_specific_source_ip(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = self.driver.ex_list_firewall_rules(net)
specific_source_ip_rule = list(filter(lambda x: x.name == "SpecificSourceIP", rules))[0]
rule = self.driver.ex_create_firewall_rule(net, specific_source_ip_rule, "FIRST")
self.assertEqual(rule.id, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
def test_ex_create_firewall_rule_with_source_ip(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = self.driver.ex_list_firewall_rules(net)
specific_source_ip_rule = list(filter(lambda x: x.name == "SpecificSourceIP", rules))[0]
specific_source_ip_rule.source.any_ip = False
specific_source_ip_rule.source.ip_address = "10.0.0.1"
specific_source_ip_rule.source.ip_prefix_size = "15"
rule = self.driver.ex_create_firewall_rule(net, specific_source_ip_rule, "FIRST")
self.assertEqual(rule.id, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
def test_ex_create_firewall_rule_with_any_ip(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = self.driver.ex_list_firewall_rules(net)
specific_source_ip_rule = list(filter(lambda x: x.name == "SpecificSourceIP", rules))[0]
specific_source_ip_rule.source.any_ip = True
rule = self.driver.ex_create_firewall_rule(net, specific_source_ip_rule, "FIRST")
self.assertEqual(rule.id, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
def test_ex_create_firewall_rule_ip_prefix_size(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_list_firewall_rules(net)[0]
rule.source.address_list_id = None
rule.source.any_ip = False
rule.source.ip_address = "10.2.1.1"
rule.source.ip_prefix_size = "10"
rule.destination.address_list_id = None
rule.destination.any_ip = False
rule.destination.ip_address = "10.0.0.1"
rule.destination.ip_prefix_size = "20"
self.driver.ex_create_firewall_rule(net, rule, "LAST")
def test_ex_create_firewall_rule_address_list(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_list_firewall_rules(net)[0]
rule.source.address_list_id = "12345"
rule.destination.address_list_id = "12345"
self.driver.ex_create_firewall_rule(net, rule, "LAST")
def test_ex_create_firewall_rule_port_list(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_list_firewall_rules(net)[0]
rule.source.port_list_id = "12345"
rule.destination.port_list_id = "12345"
self.driver.ex_create_firewall_rule(net, rule, "LAST")
def test_ex_create_firewall_rule_port(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_list_firewall_rules(net)[0]
rule.source.port_list_id = None
rule.source.port_begin = "8000"
rule.source.port_end = "8005"
rule.destination.port_list_id = None
rule.destination.port_begin = "7000"
rule.destination.port_end = "7005"
self.driver.ex_create_firewall_rule(net, rule, "LAST")
def test_ex_create_firewall_rule_ALL_VALUES(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = self.driver.ex_list_firewall_rules(net)
for rule in rules:
self.driver.ex_create_firewall_rule(net, rule, "LAST")
def test_ex_create_firewall_rule_WITH_POSITION_RULE(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = self.driver.ex_list_firewall_rules(net)
rule = self.driver.ex_create_firewall_rule(net, rules[-2], "BEFORE", rules[-1])
self.assertEqual(rule.id, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
def test_ex_create_firewall_rule_WITH_POSITION_RULE_STR(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = self.driver.ex_list_firewall_rules(net)
rule = self.driver.ex_create_firewall_rule(
net, rules[-2], "BEFORE", "RULE_WITH_SOURCE_AND_DEST"
)
self.assertEqual(rule.id, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
def test_ex_create_firewall_rule_FAIL_POSITION(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = self.driver.ex_list_firewall_rules(net)
with self.assertRaises(ValueError):
self.driver.ex_create_firewall_rule(net, rules[0], "BEFORE")
def test_ex_create_firewall_rule_FAIL_POSITION_WITH_RULE(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = self.driver.ex_list_firewall_rules(net)
with self.assertRaises(ValueError):
self.driver.ex_create_firewall_rule(net, rules[0], "LAST", "RULE_WITH_SOURCE_AND_DEST")
def test_ex_get_firewall_rule(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
self.assertEqual(rule.id, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
def test_ex_set_firewall_rule_state(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
result = self.driver.ex_set_firewall_rule_state(rule, False)
self.assertTrue(result)
def test_ex_delete_firewall_rule(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
result = self.driver.ex_delete_firewall_rule(rule)
self.assertTrue(result)
def test_ex_edit_firewall_rule(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
rule.source.any_ip = True
result = self.driver.ex_edit_firewall_rule(rule=rule, position="LAST")
self.assertTrue(result)
def test_ex_edit_firewall_rule_source_ipaddresslist(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
rule.source.address_list_id = "802abc9f-45a7-4efb-9d5a-810082368222"
rule.source.any_ip = False
rule.source.ip_address = "10.0.0.1"
rule.source.ip_prefix_size = 10
result = self.driver.ex_edit_firewall_rule(rule=rule, position="LAST")
self.assertTrue(result)
def test_ex_edit_firewall_rule_destination_ipaddresslist(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
rule.destination.address_list_id = "802abc9f-45a7-4efb-9d5a-810082368222"
rule.destination.any_ip = False
rule.destination.ip_address = "10.0.0.1"
rule.destination.ip_prefix_size = 10
result = self.driver.ex_edit_firewall_rule(rule=rule, position="LAST")
self.assertTrue(result)
def test_ex_edit_firewall_rule_destination_ipaddress(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
rule.source.address_list_id = None
rule.source.any_ip = False
rule.source.ip_address = "10.0.0.1"
rule.source.ip_prefix_size = "10"
result = self.driver.ex_edit_firewall_rule(rule=rule, position="LAST")
self.assertTrue(result)
def test_ex_edit_firewall_rule_source_ipaddress(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
rule.destination.address_list_id = None
rule.destination.any_ip = False
rule.destination.ip_address = "10.0.0.1"
rule.destination.ip_prefix_size = "10"
result = self.driver.ex_edit_firewall_rule(rule=rule, position="LAST")
self.assertTrue(result)
def test_ex_edit_firewall_rule_with_relative_rule(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
placement_rule = self.driver.ex_list_firewall_rules(network_domain=net)[-1]
result = self.driver.ex_edit_firewall_rule(
rule=rule, position="BEFORE", relative_rule_for_position=placement_rule
)
self.assertTrue(result)
def test_ex_edit_firewall_rule_with_relative_rule_by_name(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
placement_rule = self.driver.ex_list_firewall_rules(network_domain=net)[-1]
result = self.driver.ex_edit_firewall_rule(
rule=rule, position="BEFORE", relative_rule_for_position=placement_rule.name
)
self.assertTrue(result)
def test_ex_edit_firewall_rule_source_portlist(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
rule.source.port_list_id = "802abc9f-45a7-4efb-9d5a-810082368222"
result = self.driver.ex_edit_firewall_rule(rule=rule, position="LAST")
self.assertTrue(result)
def test_ex_edit_firewall_rule_source_port(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
rule.source.port_list_id = None
rule.source.port_begin = "3"
rule.source.port_end = "10"
result = self.driver.ex_edit_firewall_rule(rule=rule, position="LAST")
self.assertTrue(result)
def test_ex_edit_firewall_rule_destination_portlist(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
rule.destination.port_list_id = "802abc9f-45a7-4efb-9d5a-810082368222"
result = self.driver.ex_edit_firewall_rule(rule=rule, position="LAST")
self.assertTrue(result)
def test_ex_edit_firewall_rule_destination_port(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
rule.destination.port_list_id = None
rule.destination.port_begin = "3"
rule.destination.port_end = "10"
result = self.driver.ex_edit_firewall_rule(rule=rule, position="LAST")
self.assertTrue(result)
def test_ex_edit_firewall_rule_invalid_position_fail(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
with self.assertRaises(ValueError):
self.driver.ex_edit_firewall_rule(rule=rule, position="BEFORE")
def test_ex_edit_firewall_rule_invalid_position_relative_rule_fail(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
relative_rule = self.driver.ex_list_firewall_rules(network_domain=net)[-1]
with self.assertRaises(ValueError):
self.driver.ex_edit_firewall_rule(
rule=rule, position="FIRST", relative_rule_for_position=relative_rule
)
def test_ex_create_nat_rule(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_create_nat_rule(net, "1.2.3.4", "4.3.2.1")
self.assertEqual(rule.id, "d31c2db0-be6b-4d50-8744-9a7a534b5fba")
def test_ex_list_nat_rules(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = self.driver.ex_list_nat_rules(net)
self.assertEqual(rules[0].id, "2187a636-7ebb-49a1-a2ff-5d617f496dce")
self.assertEqual(rules[0].internal_ip, "10.0.0.15")
self.assertEqual(rules[0].external_ip, "165.180.12.18")
def test_ex_get_nat_rule(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_nat_rule(net, "2187a636-7ebb-49a1-a2ff-5d617f496dce")
self.assertEqual(rule.id, "2187a636-7ebb-49a1-a2ff-5d617f496dce")
self.assertEqual(rule.internal_ip, "10.0.0.16")
self.assertEqual(rule.external_ip, "165.180.12.19")
def test_ex_delete_nat_rule(self):
net = self.driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = self.driver.ex_get_nat_rule(net, "2187a636-7ebb-49a1-a2ff-5d617f496dce")
result = self.driver.ex_delete_nat_rule(rule)
self.assertTrue(result)
def test_ex_enable_monitoring(self):
node = self.driver.list_nodes()[0]
result = self.driver.ex_enable_monitoring(node, "ADVANCED")
self.assertTrue(result)
def test_ex_disable_monitoring(self):
node = self.driver.list_nodes()[0]
result = self.driver.ex_disable_monitoring(node)
self.assertTrue(result)
def test_ex_change_monitoring_plan(self):
node = self.driver.list_nodes()[0]
result = self.driver.ex_update_monitoring_plan(node, "ESSENTIALS")
self.assertTrue(result)
def test_ex_add_storage_to_node(self):
node = self.driver.list_nodes()[0]
result = self.driver.ex_add_storage_to_node(node, 30, "PERFORMANCE")
self.assertTrue(result)
def test_ex_remove_storage_from_node(self):
node = self.driver.list_nodes()[0]
result = self.driver.ex_remove_storage_from_node(node, 0)
self.assertTrue(result)
def test_ex_change_storage_speed(self):
node = self.driver.list_nodes()[0]
result = self.driver.ex_change_storage_speed(node, 1, "PERFORMANCE")
self.assertTrue(result)
def test_ex_change_storage_size(self):
node = self.driver.list_nodes()[0]
result = self.driver.ex_change_storage_size(node, 1, 100)
self.assertTrue(result)
def test_ex_clone_node_to_image(self):
node = self.driver.list_nodes()[0]
result = self.driver.ex_clone_node_to_image(node, "my image", "a description")
self.assertTrue(result)
def test_ex_update_node(self):
node = self.driver.list_nodes()[0]
result = self.driver.ex_update_node(node, "my new name", "a description", 2, 4048)
self.assertTrue(result)
def test_ex_reconfigure_node(self):
node = self.driver.list_nodes()[0]
result = self.driver.ex_reconfigure_node(node, 4, 4, 1, "HIGHPERFORMANCE")
self.assertTrue(result)
def test_ex_get_location_by_id(self):
location = self.driver.ex_get_location_by_id("NA9")
self.assertTrue(location.id, "NA9")
def test_ex_get_location_by_id_NO_LOCATION(self):
location = self.driver.ex_get_location_by_id(None)
self.assertIsNone(location)
def test_ex_get_base_image_by_id(self):
image_id = self.driver.list_images()[0].id
image = self.driver.ex_get_base_image_by_id(image_id)
self.assertEqual(image.extra["OS_type"], "UNIX")
def test_ex_get_customer_image_by_id(self):
image_id = self.driver.ex_list_customer_images()[1].id
image = self.driver.ex_get_customer_image_by_id(image_id)
self.assertEqual(image.extra["OS_type"], "WINDOWS")
def test_ex_get_image_by_id_base_img(self):
image_id = self.driver.list_images()[1].id
image = self.driver.ex_get_base_image_by_id(image_id)
self.assertEqual(image.extra["OS_type"], "WINDOWS")
def test_ex_get_image_by_id_customer_img(self):
image_id = self.driver.ex_list_customer_images()[0].id
image = self.driver.ex_get_customer_image_by_id(image_id)
self.assertEqual(image.extra["OS_type"], "UNIX")
def test_ex_get_image_by_id_customer_FAIL(self):
image_id = "FAKE_IMAGE_ID"
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_get_base_image_by_id(image_id)
def test_ex_create_anti_affinity_rule(self):
node_list = self.driver.list_nodes()
success = self.driver.ex_create_anti_affinity_rule([node_list[0], node_list[1]])
self.assertTrue(success)
def test_ex_create_anti_affinity_rule_TUPLE(self):
node_list = self.driver.list_nodes()
success = self.driver.ex_create_anti_affinity_rule((node_list[0], node_list[1]))
self.assertTrue(success)
def test_ex_create_anti_affinity_rule_TUPLE_STR(self):
node_list = self.driver.list_nodes()
success = self.driver.ex_create_anti_affinity_rule((node_list[0].id, node_list[1].id))
self.assertTrue(success)
def test_ex_create_anti_affinity_rule_FAIL_STR(self):
node_list = "string"
with self.assertRaises(TypeError):
self.driver.ex_create_anti_affinity_rule(node_list)
def test_ex_create_anti_affinity_rule_FAIL_EXISTING(self):
node_list = self.driver.list_nodes()
DimensionDataMockHttp.type = "FAIL_EXISTING"
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_create_anti_affinity_rule((node_list[0], node_list[1]))
def test_ex_delete_anti_affinity_rule(self):
net_domain = self.driver.ex_list_network_domains()[0]
rule = self.driver.ex_list_anti_affinity_rules(network_domain=net_domain)[0]
success = self.driver.ex_delete_anti_affinity_rule(rule)
self.assertTrue(success)
def test_ex_delete_anti_affinity_rule_STR(self):
net_domain = self.driver.ex_list_network_domains()[0]
rule = self.driver.ex_list_anti_affinity_rules(network_domain=net_domain)[0]
success = self.driver.ex_delete_anti_affinity_rule(rule.id)
self.assertTrue(success)
def test_ex_delete_anti_affinity_rule_FAIL(self):
net_domain = self.driver.ex_list_network_domains()[0]
rule = self.driver.ex_list_anti_affinity_rules(network_domain=net_domain)[0]
DimensionDataMockHttp.type = "FAIL"
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_delete_anti_affinity_rule(rule)
def test_ex_list_anti_affinity_rules_NETWORK_DOMAIN(self):
net_domain = self.driver.ex_list_network_domains()[0]
rules = self.driver.ex_list_anti_affinity_rules(network_domain=net_domain)
self.assertTrue(isinstance(rules, list))
self.assertEqual(len(rules), 2)
self.assertTrue(isinstance(rules[0].id, str))
self.assertTrue(isinstance(rules[0].node_list, list))
def test_ex_list_anti_affinity_rules_NETWORK(self):
network = self.driver.list_networks()[0]
rules = self.driver.ex_list_anti_affinity_rules(network=network)
self.assertTrue(isinstance(rules, list))
self.assertEqual(len(rules), 2)
self.assertTrue(isinstance(rules[0].id, str))
self.assertTrue(isinstance(rules[0].node_list, list))
def test_ex_list_anti_affinity_rules_NODE(self):
node = self.driver.list_nodes()[0]
rules = self.driver.ex_list_anti_affinity_rules(node=node)
self.assertTrue(isinstance(rules, list))
self.assertEqual(len(rules), 2)
self.assertTrue(isinstance(rules[0].id, str))
self.assertTrue(isinstance(rules[0].node_list, list))
def test_ex_list_anti_affinity_rules_PAGINATED(self):
net_domain = self.driver.ex_list_network_domains()[0]
DimensionDataMockHttp.type = "PAGINATED"
rules = self.driver.ex_list_anti_affinity_rules(network_domain=net_domain)
self.assertTrue(isinstance(rules, list))
self.assertEqual(len(rules), 4)
self.assertTrue(isinstance(rules[0].id, str))
self.assertTrue(isinstance(rules[0].node_list, list))
def test_ex_list_anti_affinity_rules_ALLFILTERS(self):
net_domain = self.driver.ex_list_network_domains()[0]
DimensionDataMockHttp.type = "ALLFILTERS"
rules = self.driver.ex_list_anti_affinity_rules(
network_domain=net_domain, filter_id="FAKE_ID", filter_state="FAKE_STATE"
)
self.assertTrue(isinstance(rules, list))
self.assertEqual(len(rules), 2)
self.assertTrue(isinstance(rules[0].id, str))
self.assertTrue(isinstance(rules[0].node_list, list))
def test_ex_list_anti_affinity_rules_BAD_ARGS(self):
with self.assertRaises(ValueError):
self.driver.ex_list_anti_affinity_rules(
network="fake_network", network_domain="fake_network_domain"
)
def test_ex_create_tag_key(self):
success = self.driver.ex_create_tag_key("MyTestKey")
self.assertTrue(success)
def test_ex_create_tag_key_ALLPARAMS(self):
self.driver.connection._get_orgId()
DimensionDataMockHttp.type = "ALLPARAMS"
success = self.driver.ex_create_tag_key(
"MyTestKey",
description="Test Key Desc.",
value_required=False,
display_on_report=False,
)
self.assertTrue(success)
def test_ex_create_tag_key_BADREQUEST(self):
self.driver.connection._get_orgId()
DimensionDataMockHttp.type = "BADREQUEST"
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_create_tag_key("MyTestKey")
def test_ex_list_tag_keys(self):
tag_keys = self.driver.ex_list_tag_keys()
self.assertTrue(isinstance(tag_keys, list))
self.assertTrue(isinstance(tag_keys[0], DimensionDataTagKey))
self.assertTrue(isinstance(tag_keys[0].id, str))
def test_ex_list_tag_keys_ALLFILTERS(self):
self.driver.connection._get_orgId()
DimensionDataMockHttp.type = "ALLFILTERS"
self.driver.ex_list_tag_keys(
id="fake_id",
name="fake_name",
value_required=False,
display_on_report=False,
)
def test_ex_get_tag_by_id(self):
tag = self.driver.ex_get_tag_key_by_id("d047c609-93d7-4bc5-8fc9-732c85840075")
self.assertTrue(isinstance(tag, DimensionDataTagKey))
def test_ex_get_tag_by_id_NOEXIST(self):
self.driver.connection._get_orgId()
DimensionDataMockHttp.type = "NOEXIST"
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_get_tag_key_by_id("d047c609-93d7-4bc5-8fc9-732c85840075")
def test_ex_get_tag_by_name(self):
self.driver.connection._get_orgId()
DimensionDataMockHttp.type = "SINGLE"
tag = self.driver.ex_get_tag_key_by_name("LibcloudTest")
self.assertTrue(isinstance(tag, DimensionDataTagKey))
def test_ex_get_tag_by_name_NOEXIST(self):
with self.assertRaises(ValueError):
self.driver.ex_get_tag_key_by_name("LibcloudTest")
def test_ex_modify_tag_key_NAME(self):
tag_key = self.driver.ex_list_tag_keys()[0]
DimensionDataMockHttp.type = "NAME"
success = self.driver.ex_modify_tag_key(tag_key, name="NewName")
self.assertTrue(success)
def test_ex_modify_tag_key_NOTNAME(self):
tag_key = self.driver.ex_list_tag_keys()[0]
DimensionDataMockHttp.type = "NOTNAME"
success = self.driver.ex_modify_tag_key(
tag_key, description="NewDesc", value_required=False, display_on_report=True
)
self.assertTrue(success)
def test_ex_modify_tag_key_NOCHANGE(self):
tag_key = self.driver.ex_list_tag_keys()[0]
DimensionDataMockHttp.type = "NOCHANGE"
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_modify_tag_key(tag_key)
def test_ex_remove_tag_key(self):
tag_key = self.driver.ex_list_tag_keys()[0]
success = self.driver.ex_remove_tag_key(tag_key)
self.assertTrue(success)
def test_ex_remove_tag_key_NOEXIST(self):
tag_key = self.driver.ex_list_tag_keys()[0]
DimensionDataMockHttp.type = "NOEXIST"
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_remove_tag_key(tag_key)
def test_ex_apply_tag_to_asset(self):
node = self.driver.list_nodes()[0]
success = self.driver.ex_apply_tag_to_asset(node, "TagKeyName", "FakeValue")
self.assertTrue(success)
def test_ex_apply_tag_to_asset_NOVALUE(self):
node = self.driver.list_nodes()[0]
DimensionDataMockHttp.type = "NOVALUE"
success = self.driver.ex_apply_tag_to_asset(node, "TagKeyName")
self.assertTrue(success)
def test_ex_apply_tag_to_asset_NOTAGKEY(self):
node = self.driver.list_nodes()[0]
DimensionDataMockHttp.type = "NOTAGKEY"
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_apply_tag_to_asset(node, "TagKeyNam")
def test_ex_apply_tag_to_asset_BADASSETTYPE(self):
network = self.driver.list_networks()[0]
DimensionDataMockHttp.type = "NOTAGKEY"
with self.assertRaises(TypeError):
self.driver.ex_apply_tag_to_asset(network, "TagKeyNam")
def test_ex_remove_tag_from_asset(self):
node = self.driver.list_nodes()[0]
success = self.driver.ex_remove_tag_from_asset(node, "TagKeyName")
self.assertTrue(success)
def test_ex_remove_tag_from_asset_NOTAG(self):
node = self.driver.list_nodes()[0]
DimensionDataMockHttp.type = "NOTAG"
with self.assertRaises(DimensionDataAPIException):
self.driver.ex_remove_tag_from_asset(node, "TagKeyNam")
def test_ex_list_tags(self):
tags = self.driver.ex_list_tags()
self.assertTrue(isinstance(tags, list))
self.assertTrue(isinstance(tags[0], DimensionDataTag))
self.assertTrue(len(tags) == 3)
def test_ex_list_tags_ALLPARAMS(self):
self.driver.connection._get_orgId()
DimensionDataMockHttp.type = "ALLPARAMS"
tags = self.driver.ex_list_tags(
asset_id="fake_asset_id",
asset_type="fake_asset_type",
location="fake_location",
tag_key_name="fake_tag_key_name",
tag_key_id="fake_tag_key_id",
value="fake_value",
value_required=False,
display_on_report=False,
)
self.assertTrue(isinstance(tags, list))
self.assertTrue(isinstance(tags[0], DimensionDataTag))
self.assertTrue(len(tags) == 3)
def test_priv_location_to_location_id(self):
location = self.driver.ex_get_location_by_id("NA9")
self.assertEqual(self.driver._location_to_location_id(location), "NA9")
def test_priv_location_to_location_id_STR(self):
self.assertEqual(self.driver._location_to_location_id("NA9"), "NA9")
def test_priv_location_to_location_id_TYPEERROR(self):
with self.assertRaises(TypeError):
self.driver._location_to_location_id([1, 2, 3])
def test_priv_image_needs_auth_os_img(self):
image = self.driver.list_images()[1]
self.assertTrue(self.driver._image_needs_auth(image))
def test_priv_image_needs_auth_os_img_STR(self):
image = self.driver.list_images()[1].id
self.assertTrue(self.driver._image_needs_auth(image))
def test_priv_image_needs_auth_cust_img_windows(self):
image = self.driver.ex_list_customer_images()[1]
self.assertTrue(self.driver._image_needs_auth(image))
def test_priv_image_needs_auth_cust_img_windows_STR(self):
image = self.driver.ex_list_customer_images()[1].id
self.assertTrue(self.driver._image_needs_auth(image))
def test_priv_image_needs_auth_cust_img_linux(self):
image = self.driver.ex_list_customer_images()[0]
self.assertTrue(not self.driver._image_needs_auth(image))
def test_priv_image_needs_auth_cust_img_linux_STR(self):
image = self.driver.ex_list_customer_images()[0].id
self.assertTrue(not self.driver._image_needs_auth(image))
def test_summary_usage_report(self):
report = self.driver.ex_summary_usage_report("2016-06-01", "2016-06-30")
report_content = report
self.assertEqual(len(report_content), 13)
self.assertEqual(len(report_content[0]), 6)
def test_detailed_usage_report(self):
report = self.driver.ex_detailed_usage_report("2016-06-01", "2016-06-30")
report_content = report
self.assertEqual(len(report_content), 42)
self.assertEqual(len(report_content[0]), 4)
def test_audit_log_report(self):
report = self.driver.ex_audit_log_report("2016-06-01", "2016-06-30")
report_content = report
self.assertEqual(len(report_content), 25)
self.assertEqual(report_content[2][2], "OEC_SYSTEM")
def test_ex_list_ip_address_list(self):
net_domain = self.driver.ex_list_network_domains()[0]
ip_list = self.driver.ex_list_ip_address_list(ex_network_domain=net_domain)
self.assertTrue(isinstance(ip_list, list))
self.assertEqual(len(ip_list), 4)
self.assertTrue(isinstance(ip_list[0].name, str))
self.assertTrue(isinstance(ip_list[0].description, str))
self.assertTrue(isinstance(ip_list[0].ip_version, str))
self.assertTrue(isinstance(ip_list[0].state, str))
self.assertTrue(isinstance(ip_list[0].create_time, str))
self.assertTrue(isinstance(ip_list[0].child_ip_address_lists, list))
self.assertEqual(len(ip_list[1].child_ip_address_lists), 1)
self.assertTrue(isinstance(ip_list[1].child_ip_address_lists[0].name, str))
def test_ex_get_ip_address_list(self):
net_domain = self.driver.ex_list_network_domains()[0]
DimensionDataMockHttp.type = "FILTERBYNAME"
ip_list = self.driver.ex_get_ip_address_list(
ex_network_domain=net_domain.id,
ex_ip_address_list_name="Test_IP_Address_List_3",
)
self.assertTrue(isinstance(ip_list, list))
self.assertEqual(len(ip_list), 1)
self.assertTrue(isinstance(ip_list[0].name, str))
self.assertTrue(isinstance(ip_list[0].description, str))
self.assertTrue(isinstance(ip_list[0].ip_version, str))
self.assertTrue(isinstance(ip_list[0].state, str))
self.assertTrue(isinstance(ip_list[0].create_time, str))
ips = ip_list[0].ip_address_collection
self.assertEqual(len(ips), 3)
self.assertTrue(isinstance(ips[0].begin, str))
self.assertTrue(isinstance(ips[0].prefix_size, str))
self.assertTrue(isinstance(ips[2].end, str))
def test_ex_create_ip_address_list_FAIL(self):
net_domain = self.driver.ex_list_network_domains()[0]
with self.assertRaises(TypeError):
self.driver.ex_create_ip_address_list(ex_network_domain=net_domain.id)
def test_ex_create_ip_address_list(self):
name = "Test_IP_Address_List_3"
description = "Test Description"
ip_version = "IPV4"
child_ip_address_list_id = "0291ef78-4059-4bc1-b433-3f6ad698dc41"
child_ip_address_list = DimensionDataChildIpAddressList(
id=child_ip_address_list_id, name="test_child_ip_addr_list"
)
net_domain = self.driver.ex_list_network_domains()[0]
ip_address_1 = DimensionDataIpAddress(begin="190.2.2.100")
ip_address_2 = DimensionDataIpAddress(begin="190.2.2.106", end="190.2.2.108")
ip_address_3 = DimensionDataIpAddress(begin="190.2.2.0", prefix_size="24")
ip_address_collection = [ip_address_1, ip_address_2, ip_address_3]
# Create IP Address List
success = self.driver.ex_create_ip_address_list(
ex_network_domain=net_domain,
name=name,
ip_version=ip_version,
description=description,
ip_address_collection=ip_address_collection,
child_ip_address_list=child_ip_address_list,
)
self.assertTrue(success)
def test_ex_create_ip_address_list_STR(self):
name = "Test_IP_Address_List_3"
description = "Test Description"
ip_version = "IPV4"
child_ip_address_list_id = "0291ef78-4059-4bc1-b433-3f6ad698dc41"
net_domain = self.driver.ex_list_network_domains()[0]
ip_address_1 = DimensionDataIpAddress(begin="190.2.2.100")
ip_address_2 = DimensionDataIpAddress(begin="190.2.2.106", end="190.2.2.108")
ip_address_3 = DimensionDataIpAddress(begin="190.2.2.0", prefix_size="24")
ip_address_collection = [ip_address_1, ip_address_2, ip_address_3]
# Create IP Address List
success = self.driver.ex_create_ip_address_list(
ex_network_domain=net_domain.id,
name=name,
ip_version=ip_version,
description=description,
ip_address_collection=ip_address_collection,
child_ip_address_list=child_ip_address_list_id,
)
self.assertTrue(success)
def test_ex_edit_ip_address_list(self):
ip_address_1 = DimensionDataIpAddress(begin="190.2.2.111")
ip_address_collection = [ip_address_1]
child_ip_address_list = DimensionDataChildIpAddressList(
id="2221ef78-4059-4bc1-b433-3f6ad698dc41",
name="test_child_ip_address_list edited",
)
ip_address_list = DimensionDataIpAddressList(
id="1111ef78-4059-4bc1-b433-3f6ad698d111",
name="test ip address list edited",
ip_version="IPv4",
description="test",
ip_address_collection=ip_address_collection,
child_ip_address_lists=child_ip_address_list,
state="NORMAL",
create_time="2015-09-29T02:49:45",
)
success = self.driver.ex_edit_ip_address_list(
ex_ip_address_list=ip_address_list,
description="test ip address list",
ip_address_collection=ip_address_collection,
child_ip_address_lists=child_ip_address_list,
)
self.assertTrue(success)
def test_ex_edit_ip_address_list_STR(self):
ip_address_1 = DimensionDataIpAddress(begin="190.2.2.111")
ip_address_collection = [ip_address_1]
child_ip_address_list = DimensionDataChildIpAddressList(
id="2221ef78-4059-4bc1-b433-3f6ad698dc41",
name="test_child_ip_address_list edited",
)
success = self.driver.ex_edit_ip_address_list(
ex_ip_address_list="84e34850-595d- 436e-a885-7cd37edb24a4",
description="test ip address list",
ip_address_collection=ip_address_collection,
child_ip_address_lists=child_ip_address_list,
)
self.assertTrue(success)
def test_ex_delete_ip_address_list(self):
child_ip_address_list = DimensionDataChildIpAddressList(
id="2221ef78-4059-4bc1-b433-3f6ad698dc41",
name="test_child_ip_address_list edited",
)
ip_address_list = DimensionDataIpAddressList(
id="1111ef78-4059-4bc1-b433-3f6ad698d111",
name="test ip address list edited",
ip_version="IPv4",
description="test",
ip_address_collection=None,
child_ip_address_lists=child_ip_address_list,
state="NORMAL",
create_time="2015-09-29T02:49:45",
)
success = self.driver.ex_delete_ip_address_list(ex_ip_address_list=ip_address_list)
self.assertTrue(success)
def test_ex_delete_ip_address_list_STR(self):
success = self.driver.ex_delete_ip_address_list(
ex_ip_address_list="111ef78-4059-4bc1-b433-3f6ad698d111"
)
self.assertTrue(success)
def test_ex_list_portlist(self):
net_domain = self.driver.ex_list_network_domains()[0]
portlist = self.driver.ex_list_portlist(ex_network_domain=net_domain)
self.assertTrue(isinstance(portlist, list))
self.assertEqual(len(portlist), 3)
self.assertTrue(isinstance(portlist[0].name, str))
self.assertTrue(isinstance(portlist[0].description, str))
self.assertTrue(isinstance(portlist[0].state, str))
self.assertTrue(isinstance(portlist[0].port_collection, list))
self.assertTrue(isinstance(portlist[0].port_collection[0].begin, str))
self.assertTrue(isinstance(portlist[0].port_collection[0].end, str))
self.assertTrue(isinstance(portlist[0].child_portlist_list, list))
self.assertTrue(isinstance(portlist[0].child_portlist_list[0].id, str))
self.assertTrue(isinstance(portlist[0].child_portlist_list[0].name, str))
self.assertTrue(isinstance(portlist[0].create_time, str))
def test_ex_get_port_list(self):
net_domain = self.driver.ex_list_network_domains()[0]
portlist_id = self.driver.ex_list_portlist(ex_network_domain=net_domain)[0].id
portlist = self.driver.ex_get_portlist(ex_portlist_id=portlist_id)
self.assertTrue(isinstance(portlist, DimensionDataPortList))
self.assertTrue(isinstance(portlist.name, str))
self.assertTrue(isinstance(portlist.description, str))
self.assertTrue(isinstance(portlist.state, str))
self.assertTrue(isinstance(portlist.port_collection, list))
self.assertTrue(isinstance(portlist.port_collection[0].begin, str))
self.assertTrue(isinstance(portlist.port_collection[0].end, str))
self.assertTrue(isinstance(portlist.child_portlist_list, list))
self.assertTrue(isinstance(portlist.child_portlist_list[0].id, str))
self.assertTrue(isinstance(portlist.child_portlist_list[0].name, str))
self.assertTrue(isinstance(portlist.create_time, str))
def test_ex_get_portlist_STR(self):
net_domain = self.driver.ex_list_network_domains()[0]
portlist = self.driver.ex_list_portlist(ex_network_domain=net_domain)[0]
port_list = self.driver.ex_get_portlist(ex_portlist_id=portlist.id)
self.assertTrue(isinstance(port_list, DimensionDataPortList))
self.assertTrue(isinstance(port_list.name, str))
self.assertTrue(isinstance(port_list.description, str))
self.assertTrue(isinstance(port_list.state, str))
self.assertTrue(isinstance(port_list.port_collection, list))
self.assertTrue(isinstance(port_list.port_collection[0].begin, str))
self.assertTrue(isinstance(port_list.port_collection[0].end, str))
self.assertTrue(isinstance(port_list.child_portlist_list, list))
self.assertTrue(isinstance(port_list.child_portlist_list[0].id, str))
self.assertTrue(isinstance(port_list.child_portlist_list[0].name, str))
self.assertTrue(isinstance(port_list.create_time, str))
def test_ex_create_portlist_NOCHILDPORTLIST(self):
name = "Test_Port_List"
description = "Test Description"
net_domain = self.driver.ex_list_network_domains()[0]
port_1 = DimensionDataPort(begin="8080")
port_2 = DimensionDataIpAddress(begin="8899", end="9023")
port_collection = [port_1, port_2]
# Create IP Address List
success = self.driver.ex_create_portlist(
ex_network_domain=net_domain,
name=name,
description=description,
port_collection=port_collection,
)
self.assertTrue(success)
def test_ex_create_portlist(self):
name = "Test_Port_List"
description = "Test Description"
net_domain = self.driver.ex_list_network_domains()[0]
port_1 = DimensionDataPort(begin="8080")
port_2 = DimensionDataIpAddress(begin="8899", end="9023")
port_collection = [port_1, port_2]
child_port_1 = DimensionDataChildPortList(
id="333174a2-ae74-4658-9e56-50fc90e086cf", name="test port 1"
)
child_port_2 = DimensionDataChildPortList(
id="311174a2-ae74-4658-9e56-50fc90e04444", name="test port 2"
)
child_ports = [child_port_1, child_port_2]
# Create IP Address List
success = self.driver.ex_create_portlist(
ex_network_domain=net_domain,
name=name,
description=description,
port_collection=port_collection,
child_portlist_list=child_ports,
)
self.assertTrue(success)
def test_ex_create_portlist_STR(self):
name = "Test_Port_List"
description = "Test Description"
net_domain = self.driver.ex_list_network_domains()[0]
port_1 = DimensionDataPort(begin="8080")
port_2 = DimensionDataIpAddress(begin="8899", end="9023")
port_collection = [port_1, port_2]
child_port_1 = DimensionDataChildPortList(
id="333174a2-ae74-4658-9e56-50fc90e086cf", name="test port 1"
)
child_port_2 = DimensionDataChildPortList(
id="311174a2-ae74-4658-9e56-50fc90e04444", name="test port 2"
)
child_ports_ids = [child_port_1.id, child_port_2.id]
# Create IP Address List
success = self.driver.ex_create_portlist(
ex_network_domain=net_domain.id,
name=name,
description=description,
port_collection=port_collection,
child_portlist_list=child_ports_ids,
)
self.assertTrue(success)
def test_ex_edit_portlist(self):
net_domain = self.driver.ex_list_network_domains()[0]
portlist = self.driver.ex_list_portlist(net_domain)[0]
description = "Test Description"
port_1 = DimensionDataPort(begin="8080")
port_2 = DimensionDataIpAddress(begin="8899", end="9023")
port_collection = [port_1, port_2]
child_port_1 = DimensionDataChildPortList(
id="333174a2-ae74-4658-9e56-50fc90e086cf", name="test port 1"
)
child_port_2 = DimensionDataChildPortList(
id="311174a2-ae74-4658-9e56-50fc90e04444", name="test port 2"
)
child_ports = [child_port_1.id, child_port_2.id]
# Create IP Address List
success = self.driver.ex_edit_portlist(
ex_portlist=portlist,
description=description,
port_collection=port_collection,
child_portlist_list=child_ports,
)
self.assertTrue(success)
def test_ex_edit_portlist_STR(self):
portlist_id = "484174a2-ae74-4658-9e56-50fc90e086cf"
description = "Test Description"
port_1 = DimensionDataPort(begin="8080")
port_2 = DimensionDataIpAddress(begin="8899", end="9023")
port_collection = [port_1, port_2]
child_port_1 = DimensionDataChildPortList(
id="333174a2-ae74-4658-9e56-50fc90e086cf", name="test port 1"
)
child_port_2 = DimensionDataChildPortList(
id="311174a2-ae74-4658-9e56-50fc90e04444", name="test port 2"
)
child_ports_ids = [child_port_1.id, child_port_2.id]
# Create IP Address List
success = self.driver.ex_edit_portlist(
ex_portlist=portlist_id,
description=description,
port_collection=port_collection,
child_portlist_list=child_ports_ids,
)
self.assertTrue(success)
def test_ex_delete_portlist(self):
net_domain = self.driver.ex_list_network_domains()[0]
portlist = self.driver.ex_list_portlist(net_domain)[0]
success = self.driver.ex_delete_portlist(ex_portlist=portlist)
self.assertTrue(success)
def test_ex_delete_portlist_STR(self):
net_domain = self.driver.ex_list_network_domains()[0]
portlist = self.driver.ex_list_portlist(net_domain)[0]
success = self.driver.ex_delete_portlist(ex_portlist=portlist.id)
self.assertTrue(success)
def test_import_image(self):
tag_dictionaries = {"tagkey1_name": "dev test", "tagkey2_name": None}
success = self.driver.import_image(
ovf_package_name="aTestGocToNGoc2_export2.mf",
name="Libcloud NGOCImage_New 2",
description="test",
cluster_id="QA1_N2_VMWARE_1-01",
is_guest_os_customization="false",
tagkey_name_value_dictionaries=tag_dictionaries,
)
self.assertTrue(success)
def test_import_image_error_too_many_choice(self):
tag_dictionaries = {"tagkey1_name": "dev test", "tagkey2_name": None}
with self.assertRaises(ValueError):
self.driver.import_image(
ovf_package_name="aTestGocToNGoc2_export2.mf",
name="Libcloud NGOCImage_New 2",
description="test",
cluster_id="QA1_N2_VMWARE_1-01",
datacenter_id="QA1_N1_VMWARE_1",
is_guest_os_customization="false",
tagkey_name_value_dictionaries=tag_dictionaries,
)
def test_import_image_error_missing_choice(self):
tag_dictionaries = {"tagkey1_name": "dev test", "tagkey2_name": None}
with self.assertRaises(ValueError):
self.driver.import_image(
ovf_package_name="aTestGocToNGoc2_export2.mf",
name="Libcloud NGOCImage_New 2",
description="test",
cluster_id=None,
datacenter_id=None,
is_guest_os_customization="false",
tagkey_name_value_dictionaries=tag_dictionaries,
)
def test_exchange_nic_vlans(self):
success = self.driver.ex_exchange_nic_vlans(
nic_id_1="a4b4b42b-ccb5-416f-b052-ce7cb7fdff12",
nic_id_2="b39d09b8-ea65-424a-8fa6-c6f5a98afc69",
)
self.assertTrue(success)
def test_change_nic_network_adapter(self):
success = self.driver.ex_change_nic_network_adapter(
nic_id="0c55c269-20a5-4fec-8054-22a245a48fe4", network_adapter_name="E1000"
)
self.assertTrue(success)
def test_ex_create_node_uncustomized_mcp2_using_vlan(self):
# Get VLAN
vlan = self.driver.ex_get_vlan("0e56433f-d808-4669-821d-812769517ff8")
# Create node using vlan instead of private IPv4
node = self.driver.ex_create_node_uncustomized(
name="test_server_05",
image="fake_customer_image",
ex_network_domain="fakenetworkdomain",
ex_is_started=False,
ex_description=None,
ex_cluster_id=None,
ex_cpu_specification=None,
ex_memory_gb=None,
ex_primary_nic_private_ipv4=None,
ex_primary_nic_vlan=vlan,
ex_primary_nic_network_adapter=None,
ex_additional_nics=None,
ex_disks=None,
ex_tagid_value_pairs=None,
ex_tagname_value_pairs=None,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
def test_ex_create_node_uncustomized_mcp2_using_ipv4(self):
node = self.driver.ex_create_node_uncustomized(
name="test_server_05",
image="fake_customer_image",
ex_network_domain="fakenetworkdomain",
ex_is_started=False,
ex_description=None,
ex_cluster_id=None,
ex_cpu_specification=None,
ex_memory_gb=None,
ex_primary_nic_private_ipv4="10.0.0.1",
ex_primary_nic_vlan=None,
ex_primary_nic_network_adapter=None,
ex_additional_nics=None,
ex_disks=None,
ex_tagid_value_pairs=None,
ex_tagname_value_pairs=None,
)
self.assertEqual(node.id, "e75ead52-692f-4314-8725-c8a4f4d13a87")
class InvalidRequestError(Exception):
def __init__(self, tag):
super().__init__("Invalid Request - %s" % tag)
class DimensionDataMockHttp(MockHttp):
fixtures = ComputeFileFixtures("dimensiondata")
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_report_usage(
self, method, url, body, headers
):
body = self.fixtures.load("summary_usage_report.csv")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_report_usageDetailed(
self, method, url, body, headers
):
body = self.fixtures.load("detailed_usage_report.csv")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_auditlog(self, method, url, body, headers):
body = self.fixtures.load("audit_log.csv")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_myaccount_UNAUTHORIZED(self, method, url, body, headers):
return (httplib.UNAUTHORIZED, "", {}, httplib.responses[httplib.UNAUTHORIZED])
def _oec_0_9_myaccount(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_myaccount.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_myaccount_INPROGRESS(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_myaccount.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_myaccount_PAGINATED(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_myaccount.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_myaccount_ALLFILTERS(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_myaccount.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_base_image(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_base_image.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_base_imageWithDiskSpeed(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_base_imageWithDiskSpeed.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_deployed(
self, method, url, body, headers
):
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_deployed.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_pendingDeploy(
self, method, url, body, headers
):
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_pendingDeploy.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_datacenter(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_datacenter.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11(self, method, url, body, headers):
body = None
action = url.split("?")[-1]
if action == "restart":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11_restart.xml"
)
elif action == "shutdown":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11_shutdown.xml"
)
elif action == "delete":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11_delete.xml"
)
elif action == "start":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11_start.xml"
)
elif action == "poweroff":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11_poweroff.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11_INPROGRESS(
self, method, url, body, headers
):
body = None
action = url.split("?")[-1]
if action == "restart":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11_restart_INPROGRESS.xml"
)
elif action == "shutdown":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11_shutdown_INPROGRESS.xml"
)
elif action == "delete":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11_delete_INPROGRESS.xml"
)
elif action == "start":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11_start_INPROGRESS.xml"
)
elif action == "poweroff":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_11_poweroff_INPROGRESS.xml"
)
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server(self, method, url, body, headers):
body = self.fixtures.load("_oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_networkWithLocation(
self, method, url, body, headers
):
if method == "POST":
request = ET.fromstring(body)
if request.tag != "{http://oec.api.opsource.net/schemas/network}NewNetworkWithLocation":
raise InvalidRequestError(request.tag)
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_networkWithLocation.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_networkWithLocation_NA9(
self, method, url, body, headers
):
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_networkWithLocation.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_4bba37be_506f_11e3_b29c_001517c4643e(
self, method, url, body, headers
):
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_4bba37be_506f_11e3_b29c_001517c4643e.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_e75ead52_692f_4314_8725_c8a4f4d13a87_disk_1_changeSize(
self, method, url, body, headers
):
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_e75ead52_692f_4314_8725_c8a4f4d13a87_disk_1_changeSize.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_e75ead52_692f_4314_8725_c8a4f4d13a87_disk_1_changeSpeed(
self, method, url, body, headers
):
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_e75ead52_692f_4314_8725_c8a4f4d13a87_disk_1_changeSpeed.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_e75ead52_692f_4314_8725_c8a4f4d13a87_disk_1(
self, method, url, body, headers
):
action = url.split("?")[-1]
if action == "delete":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_e75ead52_692f_4314_8725_c8a4f4d13a87_disk_1.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_e75ead52_692f_4314_8725_c8a4f4d13a87(
self, method, url, body, headers
):
if method == "GET":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_e75ead52_692f_4314_8725_c8a4f4d13a87.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
if method == "POST":
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_e75ead52_692f_4314_8725_c8a4f4d13a87_POST.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_antiAffinityRule(
self, method, url, body, headers
):
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_antiAffinityRule_create.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_antiAffinityRule_FAIL_EXISTING(
self, method, url, body, headers
):
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_antiAffinityRule_create_FAIL.xml"
)
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_antiAffinityRule_07e3621a_a920_4a9a_943c_d8021f27f418(
self, method, url, body, headers
):
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_antiAffinityRule_delete.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_antiAffinityRule_07e3621a_a920_4a9a_943c_d8021f27f418_FAIL(
self, method, url, body, headers
):
body = self.fixtures.load(
"oec_0_9_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_antiAffinityRule_delete_FAIL.xml"
)
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server(self, method, url, body, headers):
body = self.fixtures.load("server.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_deleteServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deleteServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_deleteServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_deleteServer_INPROGRESS(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deleteServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_deleteServer_RESOURCEBUSY.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_rebootServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}rebootServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_rebootServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_rebootServer_INPROGRESS(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}rebootServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_rebootServer_RESOURCEBUSY.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server(
self, method, url, body, headers
):
if url.endswith("datacenterId=NA3"):
body = self.fixtures.load("2.4/server_server_NA3.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
body = self.fixtures.load("2.4/server_server.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_PAGESIZE50(
self, method, url, body, headers
):
if not url.endswith("pageSize=50"):
raise ValueError("pageSize is not set as expected")
body = self.fixtures.load("2.4/server_server.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_EMPTY(
self, method, url, body, headers
):
body = self.fixtures.load("server_server_paginated_empty.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_PAGED_THEN_EMPTY(
self, method, url, body, headers
):
if "pageNumber=2" in url:
body = self.fixtures.load("server_server_paginated_empty.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
else:
body = self.fixtures.load("2.4/server_server_paginated.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_PAGINATED(
self, method, url, body, headers
):
if "pageNumber=2" in url:
body = self.fixtures.load("2.4/server_server.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
else:
body = self.fixtures.load("2.4/server_server_paginated.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_PAGINATEDEMPTY(
self, method, url, body, headers
):
body = self.fixtures.load("server_server_paginated_empty.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_ALLFILTERS(
self, method, url, body, headers
):
(_, params) = url.split("?")
parameters = params.split("&")
for parameter in parameters:
(key, value) = parameter.split("=")
if key == "datacenterId":
assert value == "fake_loc"
elif key == "networkId":
assert value == "fake_network"
elif key == "networkDomainId":
assert value == "fake_network_domain"
elif key == "vlanId":
assert value == "fake_vlan"
elif key == "ipv6":
assert value == "fake_ipv6"
elif key == "privateIpv4":
assert value == "fake_ipv4"
elif key == "name":
assert value == "fake_name"
elif key == "state":
assert value == "fake_state"
elif key == "started":
assert value == "True"
elif key == "deployed":
assert value == "True"
elif key == "sourceImageId":
assert value == "fake_image"
else:
raise ValueError("Could not find in url parameters {}:{}".format(key, value))
body = self.fixtures.load("2.4/server_server.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_antiAffinityRule(
self, method, url, body, headers
):
body = self.fixtures.load("server_antiAffinityRule_list.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_antiAffinityRule_ALLFILTERS(
self, method, url, body, headers
):
(_, params) = url.split("?")
parameters = params.split("&")
for parameter in parameters:
(key, value) = parameter.split("=")
if key == "id":
assert value == "FAKE_ID"
elif key == "state":
assert value == "FAKE_STATE"
elif key == "pageSize":
assert value == "250"
elif key == "networkDomainId":
pass
else:
raise ValueError("Could not find in url parameters {}:{}".format(key, value))
body = self.fixtures.load("server_antiAffinityRule_list.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_antiAffinityRule_PAGINATED(
self, method, url, body, headers
):
if "pageNumber=2" in url:
body = self.fixtures.load("server_antiAffinityRule_list.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
else:
body = self.fixtures.load("server_antiAffinityRule_list_PAGINATED.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_infrastructure_datacenter(
self, method, url, body, headers
):
if url.endswith("id=NA9"):
body = self.fixtures.load("infrastructure_datacenter_NA9.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
body = self.fixtures.load("infrastructure_datacenter.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_infrastructure_datacenter_ALLFILTERS(
self, method, url, body, headers
):
if url.endswith("id=NA9"):
body = self.fixtures.load("infrastructure_datacenter_NA9.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
body = self.fixtures.load("infrastructure_datacenter.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_updateVmwareTools(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}updateVmwareTools":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_updateVmwareTools.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_startServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}startServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_startServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_startServer_INPROGRESS(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}startServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_startServer_INPROGRESS.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_shutdownServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}shutdownServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_shutdownServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_shutdownServer_INPROGRESS(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}shutdownServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_shutdownServer_INPROGRESS.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_resetServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}resetServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_resetServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_powerOffServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}powerOffServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_powerOffServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_powerOffServer_INPROGRESS(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}powerOffServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_powerOffServer_INPROGRESS.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_11_INPROGRESS(
self, method, url, body, headers
):
body = self.fixtures.load("2.4/server_GetServer.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_networkDomain(
self, method, url, body, headers
):
body = self.fixtures.load("network_networkDomain.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_networkDomain_ALLFILTERS(
self, method, url, body, headers
):
(_, params) = url.split("?")
parameters = params.split("&")
for parameter in parameters:
(key, value) = parameter.split("=")
if key == "datacenterId":
assert value == "fake_location"
elif key == "type":
assert value == "fake_plan"
elif key == "name":
assert value == "fake_name"
elif key == "state":
assert value == "fake_state"
else:
raise ValueError("Could not find in url parameters {}:{}".format(key, value))
body = self.fixtures.load("network_networkDomain.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_vlan(
self, method, url, body, headers
):
body = self.fixtures.load("network_vlan.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_vlan_ALLFILTERS(
self, method, url, body, headers
):
(_, params) = url.split("?")
parameters = params.split("&")
for parameter in parameters:
(key, value) = parameter.split("=")
if key == "datacenterId":
assert value == "fake_location"
elif key == "networkDomainId":
assert value == "fake_network_domain"
elif key == "ipv6Address":
assert value == "fake_ipv6"
elif key == "privateIpv4Address":
assert value == "fake_ipv4"
elif key == "name":
assert value == "fake_name"
elif key == "state":
assert value == "fake_state"
else:
raise ValueError("Could not find in url parameters {}:{}".format(key, value))
body = self.fixtures.load("network_vlan.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_deployServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deployServer":
raise InvalidRequestError(request.tag)
# Make sure the we either have a network tag with an IP or networkId
# Or Network info with a primary nic that has privateip or vlanid
network = request.find(fixxpath("network", TYPES_URN))
network_info = request.find(fixxpath("networkInfo", TYPES_URN))
if network is not None:
if network_info is not None:
raise InvalidRequestError("Request has both MCP1 and MCP2 values")
ipv4 = findtext(network, "privateIpv4", TYPES_URN)
networkId = findtext(network, "networkId", TYPES_URN)
if ipv4 is None and networkId is None:
raise InvalidRequestError(
"Invalid request MCP1 requests need privateIpv4 or networkId"
)
elif network_info is not None:
if network is not None:
raise InvalidRequestError("Request has both MCP1 and MCP2 values")
primary_nic = network_info.find(fixxpath("primaryNic", TYPES_URN))
ipv4 = findtext(primary_nic, "privateIpv4", TYPES_URN)
vlanId = findtext(primary_nic, "vlanId", TYPES_URN)
if ipv4 is None and vlanId is None:
raise InvalidRequestError(
"Invalid request MCP2 requests need privateIpv4 or vlanId"
)
else:
raise InvalidRequestError(
"Invalid request, does not have network or network_info in XML"
)
body = self.fixtures.load("server_deployServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_e75ead52_692f_4314_8725_c8a4f4d13a87(
self, method, url, body, headers
):
body = self.fixtures.load("2.4/server_server_e75ead52_692f_4314_8725_c8a4f4d13a87.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deployNetworkDomain(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deployNetworkDomain":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_deployNetworkDomain.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_networkDomain_8cdfd607_f429_4df6_9352_162cfc0891be(
self, method, url, body, headers
):
body = self.fixtures.load("network_networkDomain_8cdfd607_f429_4df6_9352_162cfc0891be.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_networkDomain_8cdfd607_f429_4df6_9352_162cfc0891be_ALLFILTERS(
self, method, url, body, headers
):
body = self.fixtures.load("network_networkDomain_8cdfd607_f429_4df6_9352_162cfc0891be.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_editNetworkDomain(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}editNetworkDomain":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_editNetworkDomain.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deleteNetworkDomain(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deleteNetworkDomain":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_deleteNetworkDomain.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deployVlan(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deployVlan":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_deployVlan.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_vlan_0e56433f_d808_4669_821d_812769517ff8(
self, method, url, body, headers
):
body = self.fixtures.load("network_vlan_0e56433f_d808_4669_821d_812769517ff8.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_editVlan(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}editVlan":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_editVlan.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deleteVlan(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deleteVlan":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_deleteVlan.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_expandVlan(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}expandVlan":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_expandVlan.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_addPublicIpBlock(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}addPublicIpBlock":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_addPublicIpBlock.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_publicIpBlock_4487241a_f0ca_11e3_9315_d4bed9b167ba(
self, method, url, body, headers
):
body = self.fixtures.load("network_publicIpBlock_4487241a_f0ca_11e3_9315_d4bed9b167ba.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_publicIpBlock(
self, method, url, body, headers
):
body = self.fixtures.load("network_publicIpBlock.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_publicIpBlock_9945dc4a_bdce_11e4_8c14_b8ca3a5d9ef8(
self, method, url, body, headers
):
body = self.fixtures.load("network_publicIpBlock_9945dc4a_bdce_11e4_8c14_b8ca3a5d9ef8.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_removePublicIpBlock(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}removePublicIpBlock":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_removePublicIpBlock.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_firewallRule(
self, method, url, body, headers
):
body = self.fixtures.load("network_firewallRule.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_createFirewallRule(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}createFirewallRule":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_createFirewallRule.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_firewallRule_d0a20f59_77b9_4f28_a63b_e58496b73a6c(
self, method, url, body, headers
):
body = self.fixtures.load("network_firewallRule_d0a20f59_77b9_4f28_a63b_e58496b73a6c.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_editFirewallRule(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}editFirewallRule":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_editFirewallRule.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deleteFirewallRule(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deleteFirewallRule":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_deleteFirewallRule.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_createNatRule(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}createNatRule":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_createNatRule.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_natRule(
self, method, url, body, headers
):
body = self.fixtures.load("network_natRule.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_natRule_2187a636_7ebb_49a1_a2ff_5d617f496dce(
self, method, url, body, headers
):
body = self.fixtures.load("network_natRule_2187a636_7ebb_49a1_a2ff_5d617f496dce.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deleteNatRule(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deleteNatRule":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_deleteNatRule.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_addNic(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}addNic":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_addNic.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_removeNic(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}removeNic":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_removeNic.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_disableServerMonitoring(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}disableServerMonitoring":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_disableServerMonitoring.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_enableServerMonitoring(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}enableServerMonitoring":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_enableServerMonitoring.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_changeServerMonitoringPlan(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}changeServerMonitoringPlan":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_changeServerMonitoringPlan.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_osImage(
self, method, url, body, headers
):
body = self.fixtures.load("2.4/image_osImage.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_osImage_c14b1a46_2428_44c1_9c1a_b20e6418d08c(
self, method, url, body, headers
):
body = self.fixtures.load("2.4/image_osImage_c14b1a46_2428_44c1_9c1a_b20e6418d08c.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_osImage_6b4fb0c7_a57b_4f58_b59c_9958f94f971a(
self, method, url, body, headers
):
body = self.fixtures.load("2.4/image_osImage_6b4fb0c7_a57b_4f58_b59c_9958f94f971a.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_osImage_5234e5c7_01de_4411_8b6e_baeb8d91cf5d(
self, method, url, body, headers
):
body = self.fixtures.load("image_osImage_BAD_REQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_osImage_2ffa36c8_1848_49eb_b4fa_9d908775f68c(
self, method, url, body, headers
):
body = self.fixtures.load("image_osImage_BAD_REQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_osImage_FAKE_IMAGE_ID(
self, method, url, body, headers
):
body = self.fixtures.load("image_osImage_BAD_REQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_customerImage(
self, method, url, body, headers
):
body = self.fixtures.load("2.4/image_customerImage.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_customerImage_5234e5c7_01de_4411_8b6e_baeb8d91cf5d(
self, method, url, body, headers
):
body = self.fixtures.load(
"2.4/image_customerImage_5234e5c7_01de_4411_8b6e_baeb8d91cf5d.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_customerImage_2ffa36c8_1848_49eb_b4fa_9d908775f68c(
self, method, url, body, headers
):
body = self.fixtures.load(
"2.4/image_customerImage_2ffa36c8_1848_49eb_b4fa_9d908775f68c.xml"
)
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_customerImage_FAKE_IMAGE_ID(
self, method, url, body, headers
):
body = self.fixtures.load("image_customerImage_BAD_REQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_reconfigureServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}reconfigureServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_reconfigureServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_cleanServer(
self, method, url, body, headers
):
body = self.fixtures.load("server_cleanServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_addDisk(
self, method, url, body, headers
):
body = self.fixtures.load("server_addDisk.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_removeDisk(
self, method, url, body, headers
):
body = self.fixtures.load("server_removeDisk.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_createTagKey(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}createTagKey":
raise InvalidRequestError(request.tag)
name = findtext(request, "name", TYPES_URN)
description = findtext(request, "description", TYPES_URN)
value_required = findtext(request, "valueRequired", TYPES_URN)
display_on_report = findtext(request, "displayOnReport", TYPES_URN)
if name is None:
raise ValueError("Name must have a value in the request")
if description is not None:
raise ValueError("Default description for a tag should be blank")
if value_required is None or value_required != "true":
raise ValueError("Default valueRequired should be true")
if display_on_report is None or display_on_report != "true":
raise ValueError("Default displayOnReport should be true")
body = self.fixtures.load("tag_createTagKey.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_createTagKey_ALLPARAMS(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}createTagKey":
raise InvalidRequestError(request.tag)
name = findtext(request, "name", TYPES_URN)
description = findtext(request, "description", TYPES_URN)
value_required = findtext(request, "valueRequired", TYPES_URN)
display_on_report = findtext(request, "displayOnReport", TYPES_URN)
if name is None:
raise ValueError("Name must have a value in the request")
if description is None:
raise ValueError("Description should have a value")
if value_required is None or value_required != "false":
raise ValueError("valueRequired should be false")
if display_on_report is None or display_on_report != "false":
raise ValueError("displayOnReport should be false")
body = self.fixtures.load("tag_createTagKey.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_createTagKey_BADREQUEST(
self, method, url, body, headers
):
body = self.fixtures.load("tag_createTagKey_BADREQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_tagKey(self, method, url, body, headers):
body = self.fixtures.load("tag_tagKey_list.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_tagKey_SINGLE(
self, method, url, body, headers
):
body = self.fixtures.load("tag_tagKey_list_SINGLE.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_tagKey_ALLFILTERS(
self, method, url, body, headers
):
(_, params) = url.split("?")
parameters = params.split("&")
for parameter in parameters:
(key, value) = parameter.split("=")
if key == "id":
assert value == "fake_id"
elif key == "name":
assert value == "fake_name"
elif key == "valueRequired":
assert value == "false"
elif key == "displayOnReport":
assert value == "false"
elif key == "pageSize":
assert value == "250"
else:
raise ValueError("Could not find in url parameters {}:{}".format(key, value))
body = self.fixtures.load("tag_tagKey_list.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_tagKey_d047c609_93d7_4bc5_8fc9_732c85840075(
self, method, url, body, headers
):
body = self.fixtures.load("tag_tagKey_5ab77f5f_5aa9_426f_8459_4eab34e03d54.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_tagKey_d047c609_93d7_4bc5_8fc9_732c85840075_NOEXIST(
self, method, url, body, headers
):
body = self.fixtures.load("tag_tagKey_5ab77f5f_5aa9_426f_8459_4eab34e03d54_BADREQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_editTagKey_NAME(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}editTagKey":
raise InvalidRequestError(request.tag)
name = findtext(request, "name", TYPES_URN)
description = findtext(request, "description", TYPES_URN)
value_required = findtext(request, "valueRequired", TYPES_URN)
display_on_report = findtext(request, "displayOnReport", TYPES_URN)
if name is None:
raise ValueError("Name must have a value in the request")
if description is not None:
raise ValueError("Description should be empty")
if value_required is not None:
raise ValueError("valueRequired should be empty")
if display_on_report is not None:
raise ValueError("displayOnReport should be empty")
body = self.fixtures.load("tag_editTagKey.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_editTagKey_NOTNAME(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}editTagKey":
raise InvalidRequestError(request.tag)
name = findtext(request, "name", TYPES_URN)
description = findtext(request, "description", TYPES_URN)
value_required = findtext(request, "valueRequired", TYPES_URN)
display_on_report = findtext(request, "displayOnReport", TYPES_URN)
if name is not None:
raise ValueError("Name should be empty")
if description is None:
raise ValueError("Description should not be empty")
if value_required is None:
raise ValueError("valueRequired should not be empty")
if display_on_report is None:
raise ValueError("displayOnReport should not be empty")
body = self.fixtures.load("tag_editTagKey.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_editTagKey_NOCHANGE(
self, method, url, body, headers
):
body = self.fixtures.load("tag_editTagKey_BADREQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_deleteTagKey(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deleteTagKey":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("tag_deleteTagKey.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_deleteTagKey_NOEXIST(
self, method, url, body, headers
):
body = self.fixtures.load("tag_deleteTagKey_BADREQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_applyTags(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}applyTags":
raise InvalidRequestError(request.tag)
asset_type = findtext(request, "assetType", TYPES_URN)
asset_id = findtext(request, "assetId", TYPES_URN)
tag = request.find(fixxpath("tag", TYPES_URN))
tag_key_name = findtext(tag, "tagKeyName", TYPES_URN)
value = findtext(tag, "value", TYPES_URN)
if asset_type is None:
raise ValueError("assetType should not be empty")
if asset_id is None:
raise ValueError("assetId should not be empty")
if tag_key_name is None:
raise ValueError("tagKeyName should not be empty")
if value is None:
raise ValueError("value should not be empty")
body = self.fixtures.load("tag_applyTags.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_applyTags_NOVALUE(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}applyTags":
raise InvalidRequestError(request.tag)
asset_type = findtext(request, "assetType", TYPES_URN)
asset_id = findtext(request, "assetId", TYPES_URN)
tag = request.find(fixxpath("tag", TYPES_URN))
tag_key_name = findtext(tag, "tagKeyName", TYPES_URN)
value = findtext(tag, "value", TYPES_URN)
if asset_type is None:
raise ValueError("assetType should not be empty")
if asset_id is None:
raise ValueError("assetId should not be empty")
if tag_key_name is None:
raise ValueError("tagKeyName should not be empty")
if value is not None:
raise ValueError("value should be empty")
body = self.fixtures.load("tag_applyTags.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_applyTags_NOTAGKEY(
self, method, url, body, headers
):
body = self.fixtures.load("tag_applyTags_BADREQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_removeTags(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}removeTags":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("tag_removeTag.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_removeTags_NOTAG(
self, method, url, body, headers
):
body = self.fixtures.load("tag_removeTag_BADREQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_tag(self, method, url, body, headers):
body = self.fixtures.load("tag_tag_list.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_tag_ALLPARAMS(
self, method, url, body, headers
):
(_, params) = url.split("?")
parameters = params.split("&")
for parameter in parameters:
(key, value) = parameter.split("=")
if key == "assetId":
assert value == "fake_asset_id"
elif key == "assetType":
assert value == "fake_asset_type"
elif key == "valueRequired":
assert value == "false"
elif key == "displayOnReport":
assert value == "false"
elif key == "pageSize":
assert value == "250"
elif key == "datacenterId":
assert value == "fake_location"
elif key == "value":
assert value == "fake_value"
elif key == "tagKeyName":
assert value == "fake_tag_key_name"
elif key == "tagKeyId":
assert value == "fake_tag_key_id"
else:
raise ValueError("Could not find in url parameters {}:{}".format(key, value))
body = self.fixtures.load("tag_tag_list.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_ipAddressList(
self, method, url, body, headers
):
body = self.fixtures.load("ip_address_lists.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_ipAddressList_FILTERBYNAME(
self, method, url, body, headers
):
body = self.fixtures.load("ip_address_lists_FILTERBYNAME.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_createIpAddressList(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}" "createIpAddressList":
raise InvalidRequestError(request.tag)
net_domain = findtext(request, "networkDomainId", TYPES_URN)
if net_domain is None:
raise ValueError("Network Domain should not be empty")
name = findtext(request, "name", TYPES_URN)
if name is None:
raise ValueError("Name should not be empty")
ip_version = findtext(request, "ipVersion", TYPES_URN)
if ip_version is None:
raise ValueError("IP Version should not be empty")
ip_address_col_required = findall(request, "ipAddress", TYPES_URN)
child_ip_address_required = findall(request, "childIpAddressListId", TYPES_URN)
if 0 == len(ip_address_col_required) and 0 == len(child_ip_address_required):
raise ValueError(
"At least one ipAddress element or "
"one childIpAddressListId element must be "
"provided."
)
if ip_address_col_required[0].get("begin") is None:
raise ValueError("IP Address should not be empty")
body = self.fixtures.load("ip_address_list_create.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_editIpAddressList(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}" "editIpAddressList":
raise InvalidRequestError(request.tag)
ip_address_list = request.get("id")
if ip_address_list is None:
raise ValueError("IpAddressList ID should not be empty")
name = findtext(request, "name", TYPES_URN)
if name is not None:
raise ValueError("Name should not exists in request")
ip_version = findtext(request, "ipVersion", TYPES_URN)
if ip_version is not None:
raise ValueError("IP Version should not exists in request")
ip_address_col_required = findall(request, "ipAddress", TYPES_URN)
child_ip_address_required = findall(request, "childIpAddressListId", TYPES_URN)
if 0 == len(ip_address_col_required) and 0 == len(child_ip_address_required):
raise ValueError(
"At least one ipAddress element or "
"one childIpAddressListId element must be "
"provided."
)
if ip_address_col_required[0].get("begin") is None:
raise ValueError("IP Address should not be empty")
body = self.fixtures.load("ip_address_list_edit.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deleteIpAddressList(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}" "deleteIpAddressList":
raise InvalidRequestError(request.tag)
ip_address_list = request.get("id")
if ip_address_list is None:
raise ValueError("IpAddressList ID should not be empty")
body = self.fixtures.load("ip_address_list_delete.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_portList(
self, method, url, body, headers
):
body = self.fixtures.load("port_list_lists.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_portList_c8c92ea3_2da8_4d51_8153_f39bec794d69(
self, method, url, body, headers
):
body = self.fixtures.load("port_list_get.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_createPortList(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}" "createPortList":
raise InvalidRequestError(request.tag)
net_domain = findtext(request, "networkDomainId", TYPES_URN)
if net_domain is None:
raise ValueError("Network Domain should not be empty")
ports_required = findall(request, "port", TYPES_URN)
child_port_list_required = findall(request, "childPortListId", TYPES_URN)
if 0 == len(ports_required) and 0 == len(child_port_list_required):
raise ValueError(
"At least one port element or one " "childPortListId element must be provided"
)
if ports_required[0].get("begin") is None:
raise ValueError("PORT begin value should not be empty")
body = self.fixtures.load("port_list_create.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_editPortList(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}" "editPortList":
raise InvalidRequestError(request.tag)
ports_required = findall(request, "port", TYPES_URN)
child_port_list_required = findall(request, "childPortListId", TYPES_URN)
if 0 == len(ports_required) and 0 == len(child_port_list_required):
raise ValueError(
"At least one port element or one " "childPortListId element must be provided"
)
if ports_required[0].get("begin") is None:
raise ValueError("PORT begin value should not be empty")
body = self.fixtures.load("port_list_edit.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deletePortList(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}" "deletePortList":
raise InvalidRequestError(request.tag)
port_list = request.get("id")
if port_list is None:
raise ValueError("Port List ID should not be empty")
body = self.fixtures.load("ip_address_list_delete.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_cloneServer(
self, method, url, body, headers
):
body = self.fixtures.load("2.4/server_clone_response.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_importImage(
self, method, url, body, headers
):
body = self.fixtures.load("2.4/import_image_response.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_exchangeNicVlans(
self, method, url, body, headers
):
body = self.fixtures.load("2.4/exchange_nic_vlans_response.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_changeNetworkAdapter(
self, method, url, body, headers
):
body = self.fixtures.load("2.4/change_nic_networkadapter_response.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_deployUncustomizedServer(
self, method, url, body, headers
):
body = self.fixtures.load("2.4/deploy_customised_server.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
if __name__ == "__main__":
sys.exit(unittest.main())