blob: ff45d0863d9847223aed74f6f56c7cf66b3e880f [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import unittest
from types import GeneratorType
import pytest
from libcloud.test import MockHttp
from libcloud.utils.py3 import ET, httplib
from libcloud.utils.xml import findall, findtext, fixxpath
from libcloud.common.types import InvalidCredsError
from libcloud.compute.base import Node, NodeLocation, NodeAuthPassword
from libcloud.test.secrets import NTTCIS_PARAMS
from libcloud.common.nttcis import (
TYPES_URN,
NttCisTag,
ClassFactory,
NttCisTagKey,
NttCisServerDisk,
NttCisAPIException,
NttCisServerVMWareTools,
NetworkDomainServicePlan,
NttCisServerCpuSpecification,
)
from libcloud.test.file_fixtures import ComputeFileFixtures
from libcloud.compute.drivers.nttcis import NttCisNic
from libcloud.compute.drivers.nttcis import NttCisNodeDriver as NttCis
@pytest.fixture()
def driver():
NttCis.connectionCls.active_api_version = "2.7"
NttCis.connectionCls.conn_class = NttCisMockHttp
NttCisMockHttp.type = None
driver = NttCis(*NTTCIS_PARAMS)
return driver
def test_auth_fails(driver):
with pytest.raises(ValueError):
NttCis(*NTTCIS_PARAMS, region="blah")
def test_get_account_details(driver):
NttCisMockHttp.type = None
ret = driver.connection.get_account_details()
assert ret.full_name == "Test User"
assert ret.first_name == "Test"
assert ret.email == "test@example.com"
def test_invalid_creds(driver):
NttCisMockHttp.type = "UNAUTHORIZED"
with pytest.raises(InvalidCredsError):
driver.list_nodes()
def test_list_locations_response(driver):
NttCisMockHttp.type = None
ret = driver.list_locations()
assert len(ret) == 5
first_loc = ret[0]
assert first_loc.id == "NA3"
assert first_loc.name == "US - West"
assert first_loc.country == "US"
def test_list_nodes_response(driver):
NttCisMockHttp.type = None
ret = driver.list_nodes()
assert len(ret) == 7
def test_node_extras(driver):
NttCisMockHttp.type = None
ret = driver.list_nodes()
assert isinstance(ret[0].extra["vmWareTools"], NttCisServerVMWareTools)
assert isinstance(ret[0].extra["cpu"], NttCisServerCpuSpecification)
assert isinstance(ret[0].extra["disks"], list)
assert isinstance(ret[0].extra["disks"][0], NttCisServerDisk)
assert ret[0].extra["disks"][0].size_gb, 10
assert isinstance(ret[1].extra["disks"], list)
assert isinstance(ret[1].extra["disks"][0], NttCisServerDisk)
assert ret[1].extra["disks"][0].size_gb, 10
def test_server_states(driver):
NttCisMockHttp.type = None
ret = driver.list_nodes()
assert ret[0].state == "running"
assert ret[1].state == "starting"
assert ret[2].state == "stopping"
assert ret[3].state == "reconfiguring"
assert ret[4].state == "running"
assert ret[5].state == "terminated"
assert ret[6].state == "stopped"
assert len(ret) == 7
def test_list_nodes_response_PAGINATED(driver):
NttCisMockHttp.type = "PAGINATED"
ret = driver.list_nodes()
assert len(ret) == 7
def test_paginated_mcp2_call_EMPTY(driver):
# cache org
driver.connection._get_orgId()
NttCisMockHttp.type = "EMPTY"
node_list_generator = driver.connection.paginated_request_with_orgId_api_2("server/server")
empty_node_list = []
for node_list in node_list_generator:
empty_node_list.extend(node_list)
assert len(empty_node_list) == 0
def test_paginated_mcp2_call_PAGED_THEN_EMPTY(driver):
# cache org
driver.connection._get_orgId()
NttCisMockHttp.type = "PAGED_THEN_EMPTY"
node_list_generator = driver.connection.paginated_request_with_orgId_api_2("server/server")
final_node_list = []
for node_list in node_list_generator:
final_node_list.extend(node_list)
assert len(final_node_list) == 2
def test_paginated_mcp2_call_with_page_size(driver):
# cache org
driver.connection._get_orgId()
NttCisMockHttp.type = "PAGESIZE50"
node_list_generator = driver.connection.paginated_request_with_orgId_api_2(
"server/server", page_size=50
)
assert isinstance(node_list_generator, GeneratorType)
# We're making sure here the filters make it to the URL
# See _caas_2_4_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_ALLFILTERS for asserts
def test_list_nodes_response_strings_ALLFILTERS(driver):
NttCisMockHttp.type = "ALLFILTERS"
ret = driver.list_nodes(
ex_location="fake_loc",
ex_name="fake_name",
ex_ipv6="fake_ipv6",
ex_ipv4="fake_ipv4",
ex_vlan="fake_vlan",
ex_image="fake_image",
ex_deployed=True,
ex_started=True,
ex_state="fake_state",
ex_network_domain="fake_network_domain",
)
assert isinstance(ret, list)
assert len(ret) == 7
node = ret[3]
assert isinstance(node.extra["disks"], list)
assert isinstance(node.extra["disks"][0], NttCisServerDisk)
assert node.image.id == "3ebf3c0f-90fe-4a8b-8585-6e65b316592c"
assert node.image.name == "WIN2008S/32"
disk = node.extra["disks"][0]
assert disk.id == "c2e1f199-116e-4dbc-9960-68720b832b0a"
assert disk.scsi_id == 0
assert disk.size_gb == 50
assert disk.speed == "STANDARD"
assert disk.state == "NORMAL"
def test_list_nodes_response_LOCATION(driver):
NttCisMockHttp.type = None
ret = driver.list_locations()
first_loc = ret[0]
ret = driver.list_nodes(ex_location=first_loc)
for node in ret:
assert node.extra["datacenterId"] == "NA3"
def test_list_nodes_response_LOCATION_STR(driver):
NttCisMockHttp.type = None
ret = driver.list_nodes(ex_location="NA3")
for node in ret:
assert node.extra["datacenterId"] == "NA3"
def test_list_sizes_response(driver):
NttCisMockHttp.type = None
ret = driver.list_sizes()
assert len(ret) == 1
size = ret[0]
assert size.name == "default"
def test_list_datacenter_snapshot_windows(driver):
NttCisMockHttp.type = None
ret = driver.list_snapshot_windows("f1d6a564-490e-4166-b91d-feddc1f92025", "ADVANCED")
assert isinstance(ret[0], dict)
def test_list_snapshots(driver):
NttCisMockHttp.type = None
snapshots = driver.list_snapshots("sdk_server_1", page_size=1)
assert len(snapshots) == 1
assert snapshots[0]["id"] == "d11940a8-1455-43bf-a2de-b51a38c2aa94"
def test_enable_snapshot_service(driver):
NttCisMockHttp.type = None
window_id = "ea646520-4272-11e8-838c-180373fb68df"
node = "e1eb7d71-93c9-4b9c-807c-e05932dc8143"
result = driver.ex_enable_snapshots(node, window_id)
assert result is True
def test_initiate_manual_snapshot(driver):
NttCisMockHttp.type = None
result = driver.ex_initiate_manual_snapshot("test", "e1eb7d71-93c9-4b9c-807c-e05932dc8143")
assert result is True
def test_create_snapshot_preview_server(driver):
snapshot_id = "dd9a9e7e-2de7-4543-adef-bb1fda7ac030"
server_name = "test_snapshot"
start = "true"
nic_connected = "true"
result = driver.ex_create_snapshot_preview_server(
snapshot_id, server_name, start, nic_connected
)
assert result is True
def test_disable_node_snapshot(driver):
node = "e1eb7d71-93c9-4b9c-807c-e05932dc8143"
assert driver.ex_disable_snapshots(node) is True
def test_reboot_node_response(driver):
node = Node(id="11", name=None, state=None, public_ips=None, private_ips=None, driver=driver)
ret = node.reboot()
assert ret is True
def test_reboot_node_response_INPROGRESS(driver):
NttCisMockHttp.type = "INPROGRESS"
node = Node(id="11", name=None, state=None, public_ips=None, private_ips=None, driver=driver)
with pytest.raises(NttCisAPIException):
node.reboot()
def test_destroy_node_response(driver):
node = Node(id="11", name=None, state=None, public_ips=None, private_ips=None, driver=driver)
ret = node.destroy()
assert ret is True
def test_destroy_node_response_RESOURCE_BUSY(driver):
NttCisMockHttp.type = "INPROGRESS"
node = Node(id="11", name=None, state=None, public_ips=None, private_ips=None, driver=driver)
with pytest.raises(NttCisAPIException):
node.destroy()
def test_list_images(driver):
images = driver.list_images()
assert len(images) == 3
assert images[0].name == "RedHat 6 64-bit 2 CPU"
assert images[0].id == "c14b1a46-2428-44c1-9c1a-b20e6418d08c"
assert images[0].extra["location"].id == "NA9"
assert images[0].extra["cpu"].cpu_count == 2
assert images[0].extra["OS_displayName"] == "REDHAT6/64"
def test_clean_failed_deployment_response_with_node(driver):
node = Node(id="11", name=None, state=None, public_ips=None, private_ips=None, driver=driver)
ret = driver.ex_clean_failed_deployment(node)
assert ret is True
def test_clean_failed_deployment_response_with_node_id(driver):
node = "e75ead52-692f-4314-8725-c8a4f4d13a87"
ret = driver.ex_clean_failed_deployment(node)
assert ret is True
def test_ex_list_customer_images(driver):
images = driver.ex_list_customer_images()
assert len(images) == 3
assert images[0].name == "ImportedCustomerImage"
assert images[0].id == "5234e5c7-01de-4411-8b6e-baeb8d91cf5d"
assert images[0].extra["location"].id == "NA9"
assert images[0].extra["cpu"].cpu_count == 4
assert images[0].extra["OS_displayName"] == "REDHAT6/64"
def test_create_mcp1_node_optional_param(driver):
root_pw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
network = driver.ex_list_networks()[0]
cpu_spec = NttCisServerCpuSpecification(
cpu_count="4", cores_per_socket="2", performance="STANDARD"
)
disks = [NttCisServerDisk(scsi_id="0", speed="HIGHPERFORMANCE")]
node = driver.create_node(
name="test2",
image=image,
auth=root_pw,
ex_description="test2 node",
ex_network=network,
ex_is_started=False,
ex_memory_gb=8,
ex_disks=disks,
ex_cpu_specification=cpu_spec,
ex_primary_dns="10.0.0.5",
ex_secondary_dns="10.0.0.6",
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
def test_create_mcp1_node_response_no_pass_random_gen(driver):
image = driver.list_images()[0]
network = driver.ex_list_networks()[0]
node = driver.create_node(
name="test2",
image=image,
auth=None,
ex_description="test2 node",
ex_network=network,
ex_is_started=False,
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
assert "password" in node.extra
def test_create_mcp1_node_response_no_pass_customer_windows(driver):
image = driver.ex_list_customer_images()[1]
network = driver.ex_list_networks()[0]
node = driver.create_node(
name="test2",
image=image,
auth=None,
ex_description="test2 node",
ex_network=network,
ex_is_started=False,
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
assert "password" in node.extra
def test_create_mcp1_node_response_no_pass_customer_windows_STR(driver):
image = driver.ex_list_customer_images()[1].id
network = driver.ex_list_networks()[0]
node = driver.create_node(
name="test2",
image=image,
auth=None,
ex_description="test2 node",
ex_network=network,
ex_is_started=False,
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
assert "password" in node.extra
def test_create_mcp1_node_response_no_pass_customer_linux(driver):
image = driver.ex_list_customer_images()[0]
network = driver.ex_list_networks()[0]
node = driver.create_node(
name="test2",
image=image,
auth=None,
ex_description="test2 node",
ex_network=network,
ex_is_started=False,
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
assert "password" not in node.extra
def test_create_mcp1_node_response_no_pass_customer_linux_STR(driver):
image = driver.ex_list_customer_images()[0].id
network = driver.ex_list_networks()[0]
node = driver.create_node(
name="test2",
image=image,
auth=None,
ex_description="test2 node",
ex_network=network,
ex_is_started=False,
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
assert "password" not in node.extra
def test_create_mcp1_node_response_STR(driver):
rootPw = "pass123"
image = driver.list_images()[0].id
network = driver.ex_list_networks()[0].id
node = driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network=network,
ex_is_started=False,
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
def test_create_mcp1_node_no_network(driver):
rootPw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
with pytest.raises(InvalidRequestError):
driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network=None,
ex_is_started=False,
)
def test_create_node_mcp1_ipv4(driver):
rootPw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
node = driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network="fakenetwork",
ex_primary_ipv4="10.0.0.1",
ex_is_started=False,
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
def test_create_node_mcp1_network(driver):
rootPw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
node = driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network="fakenetwork",
ex_is_started=False,
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
def test_create_node_response_network_domain(driver):
rootPw = NodeAuthPassword("pass123")
location = driver.ex_get_location_by_id("NA9")
image = driver.list_images(location=location)[0]
network_domain = driver.ex_list_network_domains(location=location)[0]
vlan = driver.ex_list_vlans(location=location)[0]
cpu = NttCisServerCpuSpecification(
cpu_count=4, cores_per_socket=1, performance="HIGHPERFORMANCE"
)
node = driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain=network_domain,
ex_vlan=vlan,
ex_is_started=False,
ex_cpu_specification=cpu,
ex_memory_gb=4,
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
def test_create_node_response_network_domain_STR(driver):
rootPw = NodeAuthPassword("pass123")
location = driver.ex_get_location_by_id("NA9")
image = driver.list_images(location=location)[0]
network_domain = driver.ex_list_network_domains(location=location)[0].id
vlan = driver.ex_list_vlans(location=location)[0].id
cpu = NttCisServerCpuSpecification(
cpu_count=4, cores_per_socket=1, performance="HIGHPERFORMANCE"
)
node = driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain=network_domain,
ex_vlan=vlan,
ex_is_started=False,
ex_cpu_specification=cpu,
ex_memory_gb=4,
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
def test_create_node_mcp2_vlan(driver):
rootPw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
node = driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_vlan="fakevlan",
ex_is_started=False,
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
def test_create_node_mcp2_ipv4(driver):
rootPw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
node = driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_ipv4="10.0.0.1",
ex_is_started=False,
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
def test_create_node_network_domain_no_vlan_or_ipv4(driver):
rootPw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
with pytest.raises(ValueError):
driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fake_network_domain",
ex_is_started=False,
)
def test_create_node_response(driver):
rootPw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
node = driver.create_node(
name="test3",
image=image,
auth=rootPw,
ex_network_domain="fakenetworkdomain",
ex_primary_nic_vlan="fakevlan",
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
def test_create_node_ms_time_zone(driver):
rootPw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
node = driver.create_node(
name="test3",
image=image,
auth=rootPw,
ex_network_domain="fakenetworkdomain",
ex_primary_nic_vlan="fakevlan",
ex_microsoft_time_zone="040",
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
def test_create_node_ambigious_mcps_fail(driver):
rootPw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
with pytest.raises(ValueError):
driver.create_node(
name="test3",
image=image,
auth=rootPw,
ex_network_domain="fakenetworkdomain",
ex_network="fakenetwork",
ex_primary_nic_vlan="fakevlan",
)
def test_create_node_no_network_domain_fail(driver):
rootPw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
with pytest.raises(ValueError):
driver.create_node(name="test3", image=image, auth=rootPw, ex_primary_nic_vlan="fakevlan")
def test_create_node_no_primary_nic_fail(driver):
rootPw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
with pytest.raises(ValueError):
driver.create_node(
name="test3",
image=image,
auth=rootPw,
ex_network_domain="fakenetworkdomain",
)
def test_create_node_primary_vlan_nic(driver):
rootPw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
node = driver.create_node(
name="test3",
image=image,
auth=rootPw,
ex_network_domain="fakenetworkdomain",
ex_primary_nic_vlan="fakevlan",
ex_primary_nic_network_adapter="v1000",
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
def test_create_node_primary_ipv4(driver):
rootPw = "pass123"
image = driver.list_images()[0]
node = driver.create_node(
name="test3",
image=image,
auth=rootPw,
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
def test_create_node_both_primary_nic_and_vlan_fail(driver):
rootPw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
with pytest.raises(ValueError):
driver.create_node(
name="test3",
image=image,
auth=rootPw,
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_primary_nic_vlan="fakevlan",
)
def test_create_node_cpu_specification(driver):
rootPw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
cpu_spec = NttCisServerCpuSpecification(
cpu_count="4", cores_per_socket="2", performance="STANDARD"
)
node = driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_is_started=False,
ex_cpu_specification=cpu_spec,
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
def test_create_node_memory(driver):
rootPw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
node = driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_is_started=False,
ex_memory_gb=8,
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
def test_create_node_disks(driver):
rootPw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
disks = [NttCisServerDisk(scsi_id="0", speed="HIGHPERFORMANCE")]
node = driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_is_started=False,
ex_disks=disks,
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
def test_create_node_disks_fail(driver):
root_pw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
disks = "blah"
with pytest.raises(TypeError):
driver.create_node(
name="test2",
image=image,
auth=root_pw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_is_started=False,
ex_disks=disks,
)
def test_create_node_ipv4_gateway(driver):
rootPw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
node = driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_is_started=False,
ex_ipv4_gateway="10.2.2.2",
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
def test_create_node_network_domain_no_vlan_no_ipv4_fail(driver):
rootPw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
with pytest.raises(ValueError):
driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fake_network_domain",
ex_is_started=False,
)
def test_create_node_mcp2_additional_nics_legacy(driver):
rootPw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
additional_vlans = ["fakevlan1", "fakevlan2"]
additional_ipv4 = ["10.0.0.2", "10.0.0.3"]
node = driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_ipv4="10.0.0.1",
ex_additional_nics_vlan=additional_vlans,
ex_additional_nics_ipv4=additional_ipv4,
ex_is_started=False,
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
def test_create_node_bad_additional_nics_ipv4(driver):
rootPw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
with pytest.raises(TypeError):
driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fake_network_domain",
ex_vlan="fake_vlan",
ex_additional_nics_ipv4="badstring",
ex_is_started=False,
)
def test_create_node_additional_nics(driver):
root_pw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
nic1 = NttCisNic(vlan="fake_vlan", network_adapter_name="v1000")
nic2 = NttCisNic(private_ip_v4="10.1.1.2", network_adapter_name="v1000")
additional_nics = [nic1, nic2]
node = driver.create_node(
name="test2",
image=image,
auth=root_pw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_additional_nics=additional_nics,
ex_is_started=False,
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
def test_create_node_additional_nics_vlan_ipv4_coexist_fail(driver):
root_pw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
nic1 = NttCisNic(private_ip_v4="10.1.1.1", vlan="fake_vlan", network_adapter_name="v1000")
nic2 = NttCisNic(private_ip_v4="10.1.1.2", vlan="fake_vlan2", network_adapter_name="v1000")
additional_nics = [nic1, nic2]
with pytest.raises(ValueError):
driver.create_node(
name="test2",
image=image,
auth=root_pw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_additional_nics=additional_nics,
ex_is_started=False,
)
def test_create_node_additional_nics_invalid_input_fail(driver):
root_pw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
additional_nics = "blah"
with pytest.raises(TypeError):
driver.create_node(
name="test2",
image=image,
auth=root_pw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_additional_nics=additional_nics,
ex_is_started=False,
)
def test_create_node_additional_nics_vlan_ipv4_not_exist_fail(driver):
root_pw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
nic1 = NttCisNic(network_adapter_name="v1000")
nic2 = NttCisNic(network_adapter_name="v1000")
additional_nics = [nic1, nic2]
with pytest.raises(ValueError):
driver.create_node(
name="test2",
image=image,
auth=root_pw,
ex_description="test2 node",
ex_network_domain="fakenetworkdomain",
ex_primary_nic_private_ipv4="10.0.0.1",
ex_additional_nics=additional_nics,
ex_is_started=False,
)
def test_create_node_bad_additional_nics_vlan(driver):
rootPw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
with pytest.raises(TypeError):
driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test2 node",
ex_network_domain="fake_network_domain",
ex_vlan="fake_vlan",
ex_additional_nics_vlan="badstring",
ex_is_started=False,
)
def test_create_node_mcp2_indicate_dns(driver):
rootPw = NodeAuthPassword("pass123")
image = driver.list_images()[0]
node = driver.create_node(
name="test2",
image=image,
auth=rootPw,
ex_description="test node dns",
ex_network_domain="fakenetworkdomain",
ex_primary_ipv4="10.0.0.1",
ex_primary_dns="8.8.8.8",
ex_secondary_dns="8.8.4.4",
ex_is_started=False,
)
assert node.id == "e75ead52-692f-4314-8725-c8a4f4d13a87"
assert node.extra["status"].action == "DEPLOY_SERVER"
def test_ex_shutdown_graceful(driver):
node = Node(id="11", name=None, state=None, public_ips=None, private_ips=None, driver=driver)
ret = driver.ex_shutdown_graceful(node)
assert ret is True
def test_ex_shutdown_graceful_INPROGRESS(driver):
NttCisMockHttp.type = "INPROGRESS"
node = Node(id="11", name=None, state=None, public_ips=None, private_ips=None, driver=driver)
with pytest.raises(NttCisAPIException):
driver.ex_shutdown_graceful(node)
def test_ex_start_node(driver):
node = Node(id="11", name=None, state=None, public_ips=None, private_ips=None, driver=driver)
ret = driver.ex_start_node(node)
assert ret is True
def test_ex_start_node_INPROGRESS(driver):
NttCisMockHttp.type = "INPROGRESS"
node = Node(id="11", name=None, state=None, public_ips=None, private_ips=None, driver=driver)
with pytest.raises(NttCisAPIException):
driver.ex_start_node(node)
def test_ex_power_off(driver):
node = Node(id="11", name=None, state=None, public_ips=None, private_ips=None, driver=driver)
ret = driver.ex_power_off(node)
assert ret is True
def test_ex_update_vm_tools(driver):
node = Node(id="11", name=None, state=None, public_ips=None, private_ips=None, driver=driver)
ret = driver.ex_update_vm_tools(node)
assert ret is True
def test_ex_power_off_INPROGRESS(driver):
NttCisMockHttp.type = "INPROGRESS"
node = Node(
id="11",
name=None,
state="STOPPING",
public_ips=None,
private_ips=None,
driver=driver,
)
with pytest.raises(NttCisAPIException):
driver.ex_power_off(node)
def test_ex_reset(driver):
node = Node(id="11", name=None, state=None, public_ips=None, private_ips=None, driver=driver)
ret = driver.ex_reset(node)
assert ret is True
def test_ex_attach_node_to_vlan(driver):
node = driver.ex_get_node_by_id("e75ead52-692f-4314-8725-c8a4f4d13a87")
vlan = driver.ex_get_vlan("0e56433f-d808-4669-821d-812769517ff8")
ret = driver.ex_attach_node_to_vlan(node, vlan)
assert ret is True
def test_ex_destroy_nic(driver):
node = driver.ex_destroy_nic("a202e51b-41c0-4cfc-add0-b1c62fc0ecf6")
assert node is True
def test_ex_create_network_domain(driver):
location = driver.ex_get_location_by_id("NA9")
plan = NetworkDomainServicePlan.ADVANCED
net = driver.ex_create_network_domain(
location=location, name="test", description="test", service_plan=plan
)
assert net.name == "test"
assert net.id == "f14a871f-9a25-470c-aef8-51e13202e1aa"
def test_ex_create_network_domain_NO_DESCRIPTION(driver):
location = driver.ex_get_location_by_id("NA9")
plan = NetworkDomainServicePlan.ADVANCED
net = driver.ex_create_network_domain(location=location, name="test", service_plan=plan)
assert net.name == "test"
assert net.id == "f14a871f-9a25-470c-aef8-51e13202e1aa"
def test_ex_get_network_domain(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
assert net.id == "8cdfd607-f429-4df6-9352-162cfc0891be"
assert net.description == "test2"
assert net.name == "test"
def test_ex_update_network_domain(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
net.name = "new name"
net2 = driver.ex_update_network_domain(net)
assert net2.name == "new name"
def test_ex_delete_network_domain(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
result = driver.ex_delete_network_domain(net)
assert result is True
def test_ex_list_network_domains(driver):
nets = driver.ex_list_network_domains()
assert nets[0].name == "Aurora"
assert isinstance(nets[0].location, NodeLocation)
def test_ex_list_network_domains_ALLFILTERS(driver):
NttCisMockHttp.type = "ALLFILTERS"
nets = driver.ex_list_network_domains(
location="fake_location",
name="fake_name",
service_plan="fake_plan",
state="fake_state",
)
assert nets[0].name == "Aurora"
assert isinstance(nets[0].location, NodeLocation)
def test_ex_list_vlans(driver):
vlans = driver.ex_list_vlans()
assert vlans[0].name == "Primary"
def test_ex_list_vlans_ALLFILTERS(driver):
NttCisMockHttp.type = "ALLFILTERS"
vlans = driver.ex_list_vlans(
location="fake_location",
network_domain="fake_network_domain",
name="fake_name",
ipv4_address="fake_ipv4",
ipv6_address="fake_ipv6",
state="fake_state",
)
assert vlans[0].name == "Primary"
def test_ex_create_vlan(
driver,
):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
vlan = driver.ex_create_vlan(
network_domain=net,
name="test",
private_ipv4_base_address="10.3.4.0",
private_ipv4_prefix_size="24",
description="test vlan",
)
assert vlan.id == "0e56433f-d808-4669-821d-812769517ff8"
def test_ex_create_vlan_NO_DESCRIPTION(
driver,
):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
vlan = driver.ex_create_vlan(
network_domain=net,
name="test",
private_ipv4_base_address="10.3.4.0",
private_ipv4_prefix_size="24",
)
assert vlan.id == "0e56433f-d808-4669-821d-812769517ff8"
def test_ex_get_vlan(driver):
vlan = driver.ex_get_vlan("0e56433f-d808-4669-821d-812769517ff8")
assert vlan.id == "0e56433f-d808-4669-821d-812769517ff8"
assert vlan.description == "test2"
assert vlan.status == "NORMAL"
assert vlan.name == "Production VLAN"
assert vlan.private_ipv4_range_address == "10.0.3.0"
assert vlan.private_ipv4_range_size == 24
assert vlan.ipv6_range_size == 64
assert vlan.ipv6_range_address == "2607:f480:1111:1153:0:0:0:0"
assert vlan.ipv4_gateway == "10.0.3.1"
assert vlan.ipv6_gateway == "2607:f480:1111:1153:0:0:0:1"
def test_ex_wait_for_state(driver):
driver.ex_wait_for_state(
"NORMAL",
driver.ex_get_vlan,
vlan_id="0e56433f-d808-4669-821d-812769517ff8",
poll_interval=0.1,
)
def test_ex_wait_for_state_NODE(driver):
driver.ex_wait_for_state(
"running",
driver.ex_get_node_by_id,
id="e75ead52-692f-4314-8725-c8a4f4d13a87",
poll_interval=0.1,
)
def test_ex_wait_for_state_FAIL(driver):
with pytest.raises(NttCisAPIException) as context:
driver.ex_wait_for_state(
"starting",
driver.ex_get_node_by_id,
id="e75ead52-692f-4314-8725-c8a4f4d13a87",
poll_interval=0.1,
timeout=0.1,
)
assert context.value.code == "running"
assert "timed out" in context.value.msg
def test_ex_update_vlan(driver):
vlan = driver.ex_get_vlan("0e56433f-d808-4669-821d-812769517ff8")
vlan.name = "new name"
vlan2 = driver.ex_update_vlan(vlan)
assert vlan2.name == "new name"
def test_ex_delete_vlan(driver):
vlan = driver.ex_get_vlan("0e56433f-d808-4669-821d-812769517ff8")
result = driver.ex_delete_vlan(vlan)
assert result is True
def test_ex_expand_vlan(driver):
vlan = driver.ex_get_vlan("0e56433f-d808-4669-821d-812769517ff8")
vlan.private_ipv4_range_size = "23"
vlan = driver.ex_expand_vlan(vlan)
assert vlan.private_ipv4_range_size == "23"
def test_ex_add_public_ip_block_to_network_domain(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
block = driver.ex_add_public_ip_block_to_network_domain(net)
assert block.id == "9945dc4a-bdce-11e4-8c14-b8ca3a5d9ef8"
def test_ex_list_public_ip_blocks(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
blocks = driver.ex_list_public_ip_blocks(net)
assert blocks[0].base_ip == "168.128.4.18"
assert blocks[0].size == "2"
assert blocks[0].id == "9945dc4a-bdce-11e4-8c14-b8ca3a5d9ef8"
assert blocks[0].location.id == "NA9"
assert blocks[0].network_domain.id == net.id
def test_ex_get_public_ip_block(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
block = driver.ex_get_public_ip_block("9945dc4a-bdce-11e4-8c14-b8ca3a5d9ef8")
assert block.base_ip == "168.128.4.18"
assert block.size == "2"
assert block.id == "9945dc4a-bdce-11e4-8c14-b8ca3a5d9ef8"
assert block.location.id == "NA9"
assert block.network_domain.id == net.id
def test_ex_delete_public_ip_block(driver):
block = driver.ex_get_public_ip_block("9945dc4a-bdce-11e4-8c14-b8ca3a5d9ef8")
result = driver.ex_delete_public_ip_block(block)
assert result is True
def test_ex_list_firewall_rules(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = driver.ex_list_firewall_rules(net)
assert rules[0].id == "756cba02-b0bc-48f4-aea5-9445870b6148"
assert rules[0].network_domain.id == "8cdfd607-f429-4df6-9352-162cfc0891be"
assert rules[0].name == "CCDEFAULT.BlockOutboundMailIPv4"
assert rules[0].action == "DROP"
assert rules[0].ip_version, "IPV4"
assert rules[0].protocol == "TCP"
assert rules[0].source.ip_address == "ANY"
assert rules[0].source.any_ip is True
assert rules[0].destination.any_ip is True
def test_ex_create_firewall_rule(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = driver.ex_list_firewall_rules(net)
rule = driver.ex_create_firewall_rule(
net,
rules[0].name,
rules[0].action,
rules[0].ip_version,
rules[0].protocol,
rules[0].source,
rules[0].destination,
"FIRST",
)
assert rule.id == "d0a20f59-77b9-4f28-a63b-e58496b73a6c"
def test_ex_create_firewall_rule_with_specific_source_ip(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = driver.ex_list_firewall_rules(net)
specific_source_ip_rule = list(filter(lambda x: x.name == "SpecificSourceIP", rules))[0]
rule = driver.ex_create_firewall_rule(
net,
specific_source_ip_rule.name,
specific_source_ip_rule.action,
specific_source_ip_rule.ip_version,
specific_source_ip_rule.protocol,
specific_source_ip_rule.source,
specific_source_ip_rule.destination,
"FIRST",
)
assert rule.id == "d0a20f59-77b9-4f28-a63b-e58496b73a6c"
def test_ex_create_firewall_rule_with_source_ip(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = driver.ex_list_firewall_rules(net)
specific_source_ip_rule = list(filter(lambda x: x.name == "SpecificSourceIP", rules))[0]
specific_source_ip_rule.source.any_ip = False
specific_source_ip_rule.source.ip_address = "10.0.0.1"
specific_source_ip_rule.source.ip_prefix_size = "15"
rule = driver.ex_create_firewall_rule(
net,
specific_source_ip_rule.name,
specific_source_ip_rule.action,
specific_source_ip_rule.ip_version,
specific_source_ip_rule.protocol,
specific_source_ip_rule.source,
specific_source_ip_rule.destination,
"FIRST",
)
assert rule.id == "d0a20f59-77b9-4f28-a63b-e58496b73a6c"
def test_ex_create_firewall_rule_with_any_ip(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = driver.ex_list_firewall_rules(net)
specific_source_ip_rule = list(filter(lambda x: x.name == "SpecificSourceIP", rules))[0]
specific_source_ip_rule.source.any_ip = True
rule = driver.ex_create_firewall_rule(
net,
specific_source_ip_rule.name,
specific_source_ip_rule.action,
specific_source_ip_rule.ip_version,
specific_source_ip_rule.protocol,
specific_source_ip_rule.source,
specific_source_ip_rule.destination,
"FIRST",
)
assert rule.id == "d0a20f59-77b9-4f28-a63b-e58496b73a6c"
def test_ex_create_firewall_rule_ip_prefix_size(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = driver.ex_list_firewall_rules(net)[0]
rule.source.address_list_id = None
rule.source.any_ip = False
rule.source.ip_address = "10.2.1.1"
rule.source.ip_prefix_size = "10"
rule.destination.address_list_id = None
rule.destination.any_ip = False
rule.destination.ip_address = "10.0.0.1"
rule.destination.ip_prefix_size = "20"
driver.ex_create_firewall_rule(
net,
rule.name,
rule.action,
rule.ip_version,
rule.protocol,
rule.source,
rule.destination,
"LAST",
)
def test_ex_create_firewall_rule_address_list(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = driver.ex_list_firewall_rules(net)[0]
rule.source.address_list_id = "12345"
rule.destination.address_list_id = "12345"
driver.ex_create_firewall_rule(
net,
rule.name,
rule.action,
rule.ip_version,
rule.protocol,
rule.source,
rule.destination,
"LAST",
)
def test_ex_create_firewall_rule_port_list(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = driver.ex_list_firewall_rules(net)[0]
rule.source.port_list_id = "12345"
rule.destination.port_list_id = "12345"
driver.ex_create_firewall_rule(
net,
rule.name,
rule.action,
rule.ip_version,
rule.protocol,
rule.source,
rule.destination,
"LAST",
)
def test_ex_create_firewall_rule_port(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = driver.ex_list_firewall_rules(net)[0]
rule.source.port_list_id = None
rule.source.port_begin = "8000"
rule.source.port_end = "8005"
rule.destination.port_list_id = None
rule.destination.port_begin = "7000"
rule.destination.port_end = "7005"
driver.ex_create_firewall_rule(
net,
rule.name,
rule.action,
rule.ip_version,
rule.protocol,
rule.source,
rule.destination,
"LAST",
)
def test_ex_create_firewall_rule_ALL_VALUES(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = driver.ex_list_firewall_rules(net)
for rule in rules:
driver.ex_create_firewall_rule(
net,
rule.name,
rule.action,
rule.ip_version,
rule.protocol,
rule.source,
rule.destination,
"LAST",
)
def test_ex_create_firewall_rule_WITH_POSITION_RULE(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = driver.ex_list_firewall_rules(net)
rule = driver.ex_create_firewall_rule(
net,
rules[-2].name,
rules[-2].action,
rules[-2].ip_version,
rules[-2].protocol,
rules[-2].source,
rules[-2].destination,
"BEFORE",
position_relative_to_rule=rules[-1],
)
assert rule.id == "d0a20f59-77b9-4f28-a63b-e58496b73a6c"
def test_ex_create_firewall_rule_WITH_POSITION_RULE_STR(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = driver.ex_list_firewall_rules(net)
rule = driver.ex_create_firewall_rule(
net,
rules[-2].name,
rules[-2].action,
rules[-2].ip_version,
rules[-2].protocol,
rules[-2].source,
rules[-2].destination,
"BEFORE",
position_relative_to_rule="RULE_WITH_SOURCE_AND_DEST",
)
assert rule.id == "d0a20f59-77b9-4f28-a63b-e58496b73a6c"
def test_ex_create_firewall_rule_FAIL_POSITION(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = driver.ex_list_firewall_rules(net)
with pytest.raises(ValueError):
driver.ex_create_firewall_rule(
net,
rules[0].name,
rules[0].action,
rules[0].ip_version,
rules[0].protocol,
rules[0].source,
rules[0].destination,
"BEFORE",
)
def test_ex_create_firewall_rule_FAIL_POSITION_WITH_RULE(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = driver.ex_list_firewall_rules(net)
with pytest.raises(ValueError):
driver.ex_create_firewall_rule(
net,
rules[0].name,
rules[0].action,
rules[0].ip_version,
rules[0].protocol,
rules[0].source,
rules[0].destination,
"LAST",
position_relative_to_rule="RULE_WITH_SOURCE_AND_DEST",
)
def test_ex_get_firewall_rule(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
assert rule.id == "d0a20f59-77b9-4f28-a63b-e58496b73a6c"
def test_ex_set_firewall_rule_state(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
result = driver.ex_set_firewall_rule_state(rule, False)
assert result is True
def test_ex_delete_firewall_rule(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
result = driver.ex_delete_firewall_rule(rule)
assert result is True
def test_ex_edit_firewall_rule(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
rule.source.any_ip = True
result = driver.ex_edit_firewall_rule(rule=rule, position="LAST")
assert result is True
def test_ex_edit_firewall_rule_source_ipaddresslist(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
rule.source.address_list_id = "802abc9f-45a7-4efb-9d5a-810082368222"
rule.source.any_ip = False
rule.source.ip_address = "10.0.0.1"
rule.source.ip_prefix_size = 10
result = driver.ex_edit_firewall_rule(rule=rule, position="LAST")
assert result is True
def test_ex_edit_firewall_rule_destination_ipaddresslist(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
rule.destination.address_list_id = "802abc9f-45a7-4efb-9d5a-810082368222"
rule.destination.any_ip = False
rule.destination.ip_address = "10.0.0.1"
rule.destination.ip_prefix_size = 10
result = driver.ex_edit_firewall_rule(rule=rule, position="LAST")
assert result is True
def test_ex_edit_firewall_rule_destination_ipaddress(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
rule.source.address_list_id = None
rule.source.any_ip = False
rule.source.ip_address = "10.0.0.1"
rule.source.ip_prefix_size = "10"
result = driver.ex_edit_firewall_rule(rule=rule, position="LAST")
assert result is True
def test_ex_edit_firewall_rule_source_ipaddress(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
rule.destination.address_list_id = None
rule.destination.any_ip = False
rule.destination.ip_address = "10.0.0.1"
rule.destination.ip_prefix_size = "10"
result = driver.ex_edit_firewall_rule(rule=rule, position="LAST")
assert result is True
def test_ex_edit_firewall_rule_with_relative_rule(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
placement_rule = driver.ex_list_firewall_rules(network_domain=net)[-1]
result = driver.ex_edit_firewall_rule(
rule=rule, position="BEFORE", relative_rule_for_position=placement_rule
)
assert result is True
def test_ex_edit_firewall_rule_with_relative_rule_by_name(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
placement_rule = driver.ex_list_firewall_rules(network_domain=net)[-1]
result = driver.ex_edit_firewall_rule(
rule=rule, position="BEFORE", relative_rule_for_position=placement_rule.name
)
assert result is True
def test_ex_edit_firewall_rule_source_portlist(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
rule.source.port_list_id = "802abc9f-45a7-4efb-9d5a-810082368222"
result = driver.ex_edit_firewall_rule(rule=rule, position="LAST")
assert result is True
def test_ex_edit_firewall_rule_source_port(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
rule.source.port_list_id = None
rule.source.port_begin = "3"
rule.source.port_end = "10"
result = driver.ex_edit_firewall_rule(rule=rule, position="LAST")
assert result is True
def test_ex_edit_firewall_rule_destination_portlist(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
rule.destination.port_list_id = "802abc9f-45a7-4efb-9d5a-810082368222"
result = driver.ex_edit_firewall_rule(rule=rule, position="LAST")
assert result is True
def test_ex_edit_firewall_rule_destination_port(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
rule.destination.port_list_id = None
rule.destination.port_begin = "3"
rule.destination.port_end = "10"
result = driver.ex_edit_firewall_rule(rule=rule, position="LAST")
assert result is True
def test_ex_edit_firewall_rule_invalid_position_fail(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
with pytest.raises(ValueError):
driver.ex_edit_firewall_rule(rule=rule, position="BEFORE")
def test_ex_edit_firewall_rule_invalid_position_relative_rule_fail(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = driver.ex_get_firewall_rule(net, "d0a20f59-77b9-4f28-a63b-e58496b73a6c")
relative_rule = driver.ex_list_firewall_rules(network_domain=net)[-1]
with pytest.raises(ValueError):
driver.ex_edit_firewall_rule(
rule=rule, position="FIRST", relative_rule_for_position=relative_rule
)
def test_ex_create_nat_rule(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = driver.ex_create_nat_rule(net, "1.2.3.4", "4.3.2.1")
assert rule.id == "d31c2db0-be6b-4d50-8744-9a7a534b5fba"
def test_ex_list_nat_rules(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rules = driver.ex_list_nat_rules(net)
assert rules[0].id == "2187a636-7ebb-49a1-a2ff-5d617f496dce"
assert rules[0].internal_ip == "10.0.0.15"
assert rules[0].external_ip == "165.180.12.18"
def test_ex_get_nat_rule(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = driver.ex_get_nat_rule(net, "2187a636-7ebb-49a1-a2ff-5d617f496dce")
assert rule.id == "2187a636-7ebb-49a1-a2ff-5d617f496dce"
assert rule.internal_ip == "10.0.0.16"
assert rule.external_ip == "165.180.12.19"
def test_ex_delete_nat_rule(driver):
net = driver.ex_get_network_domain("8cdfd607-f429-4df6-9352-162cfc0891be")
rule = driver.ex_get_nat_rule(net, "2187a636-7ebb-49a1-a2ff-5d617f496dce")
result = driver.ex_delete_nat_rule(rule)
assert result is True
def test_ex_enable_monitoring(driver):
node = driver.list_nodes()[0]
result = driver.ex_enable_monitoring(node, "ADVANCED")
assert result is True
def test_ex_disable_monitoring(driver):
node = driver.list_nodes()[0]
result = driver.ex_disable_monitoring(node)
assert result is True
def test_ex_change_monitoring_plan(driver):
node = driver.list_nodes()[0]
result = driver.ex_update_monitoring_plan(node, "ESSENTIALS")
assert result is True
def test_ex_add_storage_to_node(driver):
node = driver.list_nodes()[0]
result = driver.ex_add_storage_to_node(30, node, "PERFORMANCE")
assert result is True
def test_ex_remove_storage_from_node(driver):
node = driver.list_nodes()[0]
result = driver.ex_remove_storage_from_node(node, 0)
assert result is True
def test_ex_change_storage_speed(driver):
result = driver.ex_change_storage_speed("1", "PERFORMANCE")
assert result is True
def test_ex_change_storage_size(driver):
result = driver.ex_change_storage_size("1", 100)
assert result is True
def test_ex_clone_node_to_image(driver):
node = driver.list_nodes()[0]
result = driver.ex_clone_node_to_image(node, "my image", "a description")
assert result is True
def test_ex_edit_metadata(driver):
node = driver.list_nodes()[0]
result = driver.ex_edit_metadata(node, "my new name", "a description")
assert result is True
def test_ex_reconfigure_node(driver):
node = driver.list_nodes()[0]
result = driver.ex_reconfigure_node(node, 4, 4, 1, "HIGHPERFORMANCE")
assert result is True
def test_ex_get_location_by_id(driver):
location = driver.ex_get_location_by_id("NA9")
assert location.id == "NA9"
def test_ex_get_location_by_id_NO_LOCATION(driver):
location = driver.ex_get_location_by_id(None)
assert location is None
def test_ex_get_base_image_by_id(driver):
image_id = driver.list_images()[0].id
image = driver.ex_get_base_image_by_id(image_id)
assert image.extra["OS_type"] == "UNIX"
def test_ex_get_customer_image_by_id(driver):
image_id = driver.ex_list_customer_images()[1].id
image = driver.ex_get_customer_image_by_id(image_id)
assert image.extra["OS_type"] == "WINDOWS"
def test_ex_get_image_by_id_base_img(driver):
image_id = driver.list_images()[1].id
image = driver.ex_get_base_image_by_id(image_id)
assert image.extra["OS_type"] == "WINDOWS"
def test_ex_get_image_by_id_customer_img(driver):
image_id = driver.ex_list_customer_images()[0].id
image = driver.ex_get_customer_image_by_id(image_id)
assert image.extra["OS_type"] == "UNIX"
def test_ex_get_image_by_id_customer_FAIL(driver):
image_id = "FAKE_IMAGE_ID"
with pytest.raises(NttCisAPIException):
driver.ex_get_base_image_by_id(image_id)
def test_ex_create_anti_affinity_rule(driver):
node_list = driver.list_nodes()
success = driver.ex_create_anti_affinity_rule([node_list[0], node_list[1]])
assert success is True
def test_ex_create_anti_affinity_rule_TUPLE(driver):
node_list = driver.list_nodes()
success = driver.ex_create_anti_affinity_rule((node_list[0], node_list[1]))
assert success is True
def test_ex_create_anti_affinity_rule_TUPLE_STR(driver):
node_list = driver.list_nodes()
success = driver.ex_create_anti_affinity_rule((node_list[0].id, node_list[1].id))
assert success is True
def test_ex_create_anti_affinity_rule_FAIL_STR(driver):
node_list = "string"
with pytest.raises(TypeError):
driver.ex_create_anti_affinity_rule(node_list)
def test_ex_create_anti_affinity_rule_FAIL_EXISTING(driver):
node_list = driver.list_nodes()
NttCisMockHttp.type = "FAIL_EXISTING"
with pytest.raises(NttCisAPIException):
driver.ex_create_anti_affinity_rule((node_list[0], node_list[1]))
def test_ex_delete_anti_affinity_rule(driver):
net_domain = driver.ex_list_network_domains()[0]
rule = driver.ex_list_anti_affinity_rules(network_domain=net_domain)[0].id
success = driver.ex_delete_anti_affinity_rule(rule)
assert success is True
def test_ex_delete_anti_affinity_rule_STR(driver):
net_domain = driver.ex_list_network_domains()[0]
rule = driver.ex_list_anti_affinity_rules(network_domain=net_domain)[0]
success = driver.ex_delete_anti_affinity_rule(rule.id)
assert success is True
def test_ex_delete_anti_affinity_rule_FAIL(driver):
net_domain = driver.ex_list_network_domains()[0]
rule = driver.ex_list_anti_affinity_rules(network_domain=net_domain)[0]
NttCisMockHttp.type = "FAIL"
with pytest.raises(NttCisAPIException):
driver.ex_delete_anti_affinity_rule(rule.id)
def test_ex_list_anti_affinity_rules_NETWORK_DOMAIN(driver):
net_domain = driver.ex_list_network_domains()[0]
rules = driver.ex_list_anti_affinity_rules(network_domain=net_domain)
assert isinstance(rules, list)
assert len(rules) == 2
assert isinstance(rules[0].id, str)
assert isinstance(rules[0].node_list, list)
def test_ex_list_anti_affinity_rules_NODE(driver):
node = driver.list_nodes()[0]
rules = driver.ex_list_anti_affinity_rules(node=node)
assert isinstance(rules, list)
assert len(rules) == 2
assert isinstance(rules[0].id, str)
assert isinstance(rules[0].node_list, list)
def test_ex_list_anti_affinity_rules_PAGINATED(driver):
net_domain = driver.ex_list_network_domains()[0]
NttCisMockHttp.type = "PAGINATED"
rules = driver.ex_list_anti_affinity_rules(network_domain=net_domain)
assert isinstance(rules, list)
assert len(rules) == 4
assert isinstance(rules[0].id, str)
assert isinstance(rules[0].node_list, list)
def test_ex_list_anti_affinity_rules_ALLFILTERS(driver):
net_domain = driver.ex_list_network_domains()[0]
NttCisMockHttp.type = "ALLFILTERS"
rules = driver.ex_list_anti_affinity_rules(
network_domain=net_domain, filter_id="FAKE_ID", filter_state="FAKE_STATE"
)
assert isinstance(rules, list)
assert len(rules) == 2
assert isinstance(rules[0].id, str)
assert isinstance(rules[0].node_list, list)
def test_ex_list_anti_affinity_rules_BAD_ARGS(driver):
with pytest.raises(ValueError):
driver.ex_list_anti_affinity_rules(
network="fake_network", network_domain="fake_network_domain"
)
def test_ex_create_tag_key(driver):
success = driver.ex_create_tag_key("MyTestKey")
assert success is True
def test_ex_create_tag_key_ALLPARAMS(driver):
driver.connection._get_orgId()
NttCisMockHttp.type = "ALLPARAMS"
success = driver.ex_create_tag_key(
"MyTestKey",
description="Test Key Desc.",
value_required=False,
display_on_report=False,
)
assert success is True
def test_ex_create_tag_key_BADREQUEST(driver):
driver.connection._get_orgId()
NttCisMockHttp.type = "BADREQUEST"
with pytest.raises(NttCisAPIException):
driver.ex_create_tag_key("MyTestKey")
def test_ex_list_tag_keys(driver):
tag_keys = driver.ex_list_tag_keys()
assert isinstance(tag_keys, list)
assert isinstance(tag_keys[0], NttCisTagKey)
assert isinstance(tag_keys[0].id, str)
def test_ex_list_tag_keys_ALLFILTERS(driver):
driver.connection._get_orgId()
NttCisMockHttp.type = "ALLFILTERS"
driver.ex_list_tag_keys(
id="fake_id", name="fake_name", value_required=False, display_on_report=False
)
def test_ex_get_tag_by_id(driver):
tag = driver.ex_get_tag_key_by_id("d047c609-93d7-4bc5-8fc9-732c85840075")
assert isinstance(tag, NttCisTagKey)
def test_ex_get_tag_by_id_NOEXIST(driver):
driver.connection._get_orgId()
NttCisMockHttp.type = "NOEXIST"
with pytest.raises(NttCisAPIException):
driver.ex_get_tag_key_by_id("d047c609-93d7-4bc5-8fc9-732c85840075")
def test_ex_get_tag_by_name(driver):
driver.connection._get_orgId()
NttCisMockHttp.type = "SINGLE"
tag = driver.ex_get_tag_key_by_name("LibcloudTest")
assert isinstance(tag, NttCisTagKey)
def test_ex_get_tag_by_name_NOEXIST(driver):
with pytest.raises(ValueError):
driver.ex_get_tag_key_by_name("LibcloudTest")
def test_ex_modify_tag_key_NAME(driver):
tag_key = driver.ex_list_tag_keys()[0]
NttCisMockHttp.type = "NAME"
success = driver.ex_modify_tag_key(tag_key, name="NewName")
assert success is True
def test_ex_modify_tag_key_NOTNAME(driver):
tag_key = driver.ex_list_tag_keys()[0]
NttCisMockHttp.type = "NOTNAME"
success = driver.ex_modify_tag_key(
tag_key, description="NewDesc", value_required=False, display_on_report=True
)
assert success is True
def test_ex_modify_tag_key_NOCHANGE(driver):
tag_key = driver.ex_list_tag_keys()[0]
NttCisMockHttp.type = "NOCHANGE"
with pytest.raises(NttCisAPIException):
driver.ex_modify_tag_key(tag_key)
def test_ex_remove_tag_key(driver):
tag_key = driver.ex_list_tag_keys()[0]
success = driver.ex_remove_tag_key(tag_key)
assert success is True
def test_ex_remove_tag_key_NOEXIST(driver):
tag_key = driver.ex_list_tag_keys()[0]
NttCisMockHttp.type = "NOEXIST"
with pytest.raises(NttCisAPIException):
driver.ex_remove_tag_key(tag_key)
def test_ex_apply_tag_to_asset(driver):
node = driver.list_nodes()[0]
success = driver.ex_apply_tag_to_asset(node, "TagKeyName", "FakeValue")
assert success is True
def test_ex_apply_tag_to_asset_NOVALUE(driver):
node = driver.list_nodes()[0]
NttCisMockHttp.type = "NOVALUE"
success = driver.ex_apply_tag_to_asset(node, "TagKeyName")
assert success is True
def test_ex_apply_tag_to_asset_NOTAGKEY(driver):
node = driver.list_nodes()[0]
NttCisMockHttp.type = "NOTAGKEY"
with pytest.raises(NttCisAPIException):
driver.ex_apply_tag_to_asset(node, "TagKeyNam")
def test_ex_remove_tag_from_asset(driver):
node = driver.list_nodes()[0]
success = driver.ex_remove_tag_from_asset(node, "TagKeyName")
assert success is True
def test_ex_remove_tag_from_asset_NOTAG(driver):
node = driver.list_nodes()[0]
NttCisMockHttp.type = "NOTAG"
with pytest.raises(NttCisAPIException):
driver.ex_remove_tag_from_asset(node, "TagKeyNam")
def test_ex_list_tags(driver):
tags = driver.ex_list_tags()
assert isinstance(tags, list)
assert isinstance(tags[0], NttCisTag)
assert len(tags) == 3
def test_ex_list_tags_ALLPARAMS(driver):
driver.connection._get_orgId()
NttCisMockHttp.type = "ALLPARAMS"
tags = driver.ex_list_tags(
asset_id="fake_asset_id",
asset_type="fake_asset_type",
location="fake_location",
tag_key_name="fake_tag_key_name",
tag_key_id="fake_tag_key_id",
value="fake_value",
value_required=False,
display_on_report=False,
)
assert isinstance(tags, list)
assert isinstance(tags[0], NttCisTag)
assert len(tags) == 3
def test_list_consistency_groups(driver):
cgs = driver.ex_list_consistency_groups()
assert isinstance(cgs, list)
def test_list_cg_by_src_net_domain(driver):
nd = "f9d6a249-c922-4fa1-9f0f-de5b452c4026"
cgs = driver.ex_list_consistency_groups(source_network_domain_id=nd)
assert cgs[0].name == "sdk_test2_cg"
def test_list_cg_by_name(driver):
NttCisMockHttp.type = "CG_BY_NAME"
name = "sdk_test2_cg"
cg = driver.ex_list_consistency_groups(name=name)
assert cg[0].id == "195a426b-4559-4c79-849e-f22cdf2bfb6e"
def test_get_consistency_group_by_id(driver):
NttCisMockHttp.type = None
cgs = driver.ex_list_consistency_groups()
cg_id = [i for i in cgs if i.name == "sdk_test2_cg"][0].id
cg = driver.ex_get_consistency_group(cg_id)
assert hasattr(cg, "description")
def test_get_drs_snapshots(driver):
NttCisMockHttp.type = None
cgs = driver.ex_list_consistency_groups()
cg_id = [i for i in cgs if i.name == "sdk_test2_cg"][0].id
snaps = driver.ex_list_consistency_group_snapshots(cg_id)
assert hasattr(snaps, "journalUsageGb")
assert isinstance(snaps, ClassFactory)
def test_get_drs_snapshots_by_min_max(driver):
cgs = driver.ex_list_consistency_groups()
cg_id = [i for i in cgs if i.name == "sdk_test2_cg"][0].id
snaps = driver.ex_list_consistency_group_snapshots(
cg_id,
create_time_min="2018-11-28T00:00:00.000Z",
create_time_max="2018-11-29T00:00:00.000Z",
)
for snap in snaps.snapshot:
assert "2018-12" not in snap
def test_expand_drs_journal(driver):
cgs = driver.ex_list_consistency_groups(name="sdk_test2_cg")
cg_id = cgs[0].id
expand_by = "100"
result = driver.ex_expand_journal(cg_id, expand_by)
assert result is True
def test_start_drs_snapshot_preview(driver):
cg_id = "195a426b-4559-4c79-849e-f22cdf2bfb6e"
snapshot_id = "3893"
result = driver.ex_start_drs_failover_preview(cg_id, snapshot_id)
assert result is True
def test_stop_drs_snapshot_preivew(driver):
cg_id = "195a426b-4559-4c79-849e-f22cdf2bfb6e"
result = driver.ex_stop_drs_failover_preview(cg_id)
assert result is True
def test_start_drs_failover_invalid_status(driver):
NttCisMockHttp.type = "INVALID_STATUS"
cg_id = "195a426b-4559-4c79-849e-f22cdf2bfb6e"
with pytest.raises(NttCisAPIException) as excinfo:
driver.ex_initiate_drs_failover(cg_id)
assert "INVALID_STATUS" in excinfo.value.code
def test_initiate_drs_failover(driver):
cg_id = "195a426b-4559-4c79-849e-f22cdf2bfb6e"
result = driver.ex_initiate_drs_failover(cg_id)
assert result is True
def test_create_drs_fail_not_supported(driver):
NttCisMockHttp.type = "FAIL_NOT_SUPPORTED"
src_id = "032f3967-00e4-4780-b4ef-8587460f9dd4"
target_id = "aee58575-38e2-495f-89d3-854e6a886411"
with pytest.raises(NttCisAPIException) as excinfo:
driver.ex_create_consistency_group(
"sdk_cg", "100", src_id, target_id, description="A test consistency group"
)
exception_msg = excinfo.value.msg
assert (
exception_msg
== "DRS is not supported between source Data Center NA9 and target Data Center NA12."
)
def test_create_drs_cg_fail_ineligble(driver):
NttCisMockHttp.type = "FAIL_INELIGIBLE"
src_id = "032f3967-00e4-4780-b4ef-8587460f9dd4"
target_id = "aee58575-38e2-495f-89d3-854e6a886411"
with pytest.raises(NttCisAPIException) as excinfo:
driver.ex_create_consistency_group(
"sdk_test2_cg",
"100",
src_id,
target_id,
description="A test consistency group",
)
exception_msg = excinfo.value.msg
assert (
exception_msg
== "The drsEligible flag for target Server aee58575-38e2-495f-89d3-854e6a886411 must be set."
)
def test_create_drs_cg(driver):
src_id = "032f3967-00e4-4780-b4ef-8587460f9dd4"
target_id = "aee58575-38e2-495f-89d3-854e6a886411"
result = driver.ex_create_consistency_group(
"sdk_test2_cg2",
"100",
src_id,
target_id,
description="A test consistency group",
)
assert result is True
def test_delete_consistency_group(driver):
cg_id = "fad067be-6ca7-495d-99dc-7921c5f2ca5"
result = driver.ex_delete_consistency_group(cg_id)
assert result is True
class InvalidRequestError(Exception):
def __init__(self, tag):
super().__init__("Invalid Request - %s" % tag)
class NttCisMockHttp(MockHttp):
fixtures = ComputeFileFixtures("nttcis")
def _oec_0_9_myaccount_UNAUTHORIZED(self, method, url, body, headers):
return (httplib.UNAUTHORIZED, "", {}, httplib.responses[httplib.UNAUTHORIZED])
def _oec_0_9_myaccount(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_myaccount.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_myaccount_INPROGRESS(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_myaccount.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_myaccount_PAGINATED(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_myaccount.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_myaccount_ALLFILTERS(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_myaccount.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_myaccount_NET_DOMAIN(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_myaccount.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_myaccount_CG_BY_NAME(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_myaccount.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_myaccount_MIN_MAX(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_myaccount.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_myaccount_MIN(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_myaccount.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_myaccount_INVALID_STATUS(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_myaccount.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_myaccount_FAIL_INELIGIBLE(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_myaccount.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_myaccount_FAIL_NOT_SUPPORTED(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_myaccount.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _oec_0_9_myaccount_DYNAMIC(self, method, url, body, headers):
body = self.fixtures.load("oec_0_9_myaccount.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_networkWithLocation(
self, method, url, body, headers
):
body = self.fixtures.load("networkWithLocation.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server(self, method, url, body, headers):
body = self.fixtures.load("server.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_deleteServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deleteServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_deleteServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_deleteServer_INPROGRESS(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deleteServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_deleteServer_RESOURCEBUSY.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_rebootServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}rebootServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_rebootServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_rebootServer_INPROGRESS(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}rebootServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_rebootServer_RESOURCEBUSY.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server(
self, method, url, body, headers
):
if url.endswith("datacenterId=NA3"):
body = self.fixtures.load("server_server_NA3.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
body = self.fixtures.load("server_server.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_PAGESIZE50(
self, method, url, body, headers
):
if not url.endswith("pageSize=50"):
raise ValueError("pageSize is not set as expected")
body = self.fixtures.load("server_server.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_EMPTY(
self, method, url, body, headers
):
body = self.fixtures.load("server_server_paginated_empty.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_PAGED_THEN_EMPTY(
self, method, url, body, headers
):
if "pageNumber=2" in url:
body = self.fixtures.load("server_server_paginated_empty.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
else:
body = self.fixtures.load("server_server_paginated.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_PAGINATED(
self, method, url, body, headers
):
if "pageNumber=2" in url:
body = self.fixtures.load("server_server.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
else:
body = self.fixtures.load("server_server_paginated.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_PAGINATEDEMPTY(
self, method, url, body, headers
):
body = self.fixtures.load("server_server_paginated_empty.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_ALLFILTERS(
self, method, url, body, headers
):
(_, params) = url.split("?")
parameters = params.split("&")
for parameter in parameters:
(key, value) = parameter.split("=")
if key == "datacenterId":
assert value == "fake_loc"
elif key == "networkId":
assert value == "fake_network"
elif key == "networkDomainId":
assert value == "fake_network_domain"
elif key == "vlanId":
assert value == "fake_vlan"
elif key == "ipv6":
assert value == "fake_ipv6"
elif key == "privateIpv4":
assert value == "fake_ipv4"
elif key == "name":
assert value == "fake_name"
elif key == "state":
assert value == "fake_state"
elif key == "started":
assert value == "True"
elif key == "deployed":
assert value == "True"
elif key == "sourceImageId":
assert value == "fake_image"
else:
raise ValueError("Could not find in url parameters {}:{}".format(key, value))
body = self.fixtures.load("server_server.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_antiAffinityRule(
self, method, url, body, headers
):
body = self.fixtures.load("server_antiAffinityRule_list.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_antiAffinityRule_ALLFILTERS(
self, method, url, body, headers
):
(_, params) = url.split("?")
parameters = params.split("&")
for parameter in parameters:
(key, value) = parameter.split("=")
if key == "id":
assert value == "FAKE_ID"
elif key == "state":
assert value == "FAKE_STATE"
elif key == "pageSize":
assert value == "250"
elif key == "networkDomainId":
pass
else:
raise ValueError("Could not find in url parameters {}:{}".format(key, value))
body = self.fixtures.load("server_antiAffinityRule_list.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_createAntiAffinityRule(
self, method, url, body, headers
):
body = self.fixtures.load("server_createAntiAffinityRule.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_antiAffinityRule_PAGINATED(
self, method, url, body, headers
):
if "pageNumber=2" in url:
body = self.fixtures.load("server_antiAffinityRule_list.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
else:
body = self.fixtures.load("server_antiAffinityRule_list_PAGINATED.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_createAntiAffinityRule_FAIL_EXISTING(
self, method, url, body, headers
):
body = self.fixtures.load("server_createAntiAffinityRule_FAIL.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_deleteAntiAffinityRule(
self, method, url, body, headers
):
body = self.fixtures.load("server_deleteAntiAffinityRule.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_deleteAntiAffinityRule_FAIL(
self, method, url, body, headers
):
body = self.fixtures.load("server_createAntiAffinityRule_FAIL.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_infrastructure_snapshotWindow(
self, method, url, body, headers
):
body = self.fixtures.load("datacenter_snapshotWindows.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_infrastructure_datacenter(
self, method, url, body, headers
):
if url.endswith("id=NA9"):
body = self.fixtures.load("infrastructure_datacenter_NA9.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
body = self.fixtures.load("infrastructure_datacenter.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_infrastructure_datacenter_ALLFILTERS(
self, method, url, body, headers
):
if url.endswith("id=NA9"):
body = self.fixtures.load("infrastructure_datacenter_NA9.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
body = self.fixtures.load("infrastructure_datacenter.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_updateVmwareTools(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}updateVmwareTools":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_updateVmwareTools.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_startServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}startServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_startServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_startServer_INPROGRESS(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}startServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_startServer_INPROGRESS.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_shutdownServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}shutdownServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_shutdownServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_shutdownServer_INPROGRESS(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}shutdownServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_shutdownServer_INPROGRESS.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_resetServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}resetServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_resetServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_powerOffServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}powerOffServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_powerOffServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_powerOffServer_INPROGRESS(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}powerOffServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_powerOffServer_INPROGRESS.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_11_INPROGRESS(
self, method, url, body, headers
):
body = self.fixtures.load("server_GetServer.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_networkDomain(
self, method, url, body, headers
):
body = self.fixtures.load("network_networkDomain.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_networkDomain_ALLFILTERS(
self, method, url, body, headers
):
(_, params) = url.split("?")
parameters = params.split("&")
for parameter in parameters:
(key, value) = parameter.split("=")
if key == "datacenterId":
assert value == "fake_location"
elif key == "type":
assert value == "fake_plan"
elif key == "name":
assert value == "fake_name"
elif key == "state":
assert value == "fake_state"
else:
raise ValueError("Could not find in url parameters {}:{}".format(key, value))
body = self.fixtures.load("network_networkDomain.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_vlan(
self, method, url, body, headers
):
body = self.fixtures.load("network_vlan.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_vlan_ALLFILTERS(
self, method, url, body, headers
):
(_, params) = url.split("?")
parameters = params.split("&")
for parameter in parameters:
(key, value) = parameter.split("=")
if key == "datacenterId":
assert value == "fake_location"
elif key == "networkDomainId":
assert value == "fake_network_domain"
elif key == "ipv6Address":
assert value == "fake_ipv6"
elif key == "privateIpv4Address":
assert value == "fake_ipv4"
elif key == "name":
assert value == "fake_name"
elif key == "state":
assert value == "fake_state"
else:
raise ValueError("Could not find in url parameters {}:{}".format(key, value))
body = self.fixtures.load("network_vlan.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_deployServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deployServer":
raise InvalidRequestError(request.tag)
# Make sure the we either have a network tag with an IP or networkId
# Or Network info with a primary nic that has privateip or vlanid
network = request.find(fixxpath("network", TYPES_URN))
network_info = request.find(fixxpath("networkInfo", TYPES_URN))
if network is not None:
if network_info is not None:
raise InvalidRequestError("Request has both MCP1 and MCP2 values")
ipv4 = findtext(network, "privateIpv4", TYPES_URN)
networkId = findtext(network, "networkId", TYPES_URN)
if ipv4 is None and networkId is None:
raise InvalidRequestError(
"Invalid request MCP1 requests need privateIpv4 or networkId"
)
elif network_info is not None:
if network is not None:
raise InvalidRequestError("Request has both MCP1 and MCP2 values")
primary_nic = network_info.find(fixxpath("primaryNic", TYPES_URN))
ipv4 = findtext(primary_nic, "privateIpv4", TYPES_URN)
vlanId = findtext(primary_nic, "vlanId", TYPES_URN)
if ipv4 is None and vlanId is None:
raise InvalidRequestError(
"Invalid request MCP2 requests need privateIpv4 or vlanId"
)
else:
raise InvalidRequestError(
"Invalid request, does not have network or network_info in XML"
)
body = self.fixtures.load("server_deployServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_e75ead52_692f_4314_8725_c8a4f4d13a87(
self, method, url, body, headers
):
body = self.fixtures.load("server_server_e75ead52_692f_4314_8725_c8a4f4d13a87.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deployNetworkDomain(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deployNetworkDomain":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_deployNetworkDomain.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_networkDomain_8cdfd607_f429_4df6_9352_162cfc0891be(
self, method, url, body, headers
):
body = self.fixtures.load("network_networkDomain_8cdfd607_f429_4df6_9352_162cfc0891be.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_networkDomain_8cdfd607_f429_4df6_9352_162cfc0891be_ALLFILTERS(
self, method, url, body, headers
):
body = self.fixtures.load("network_networkDomain_8cdfd607_f429_4df6_9352_162cfc0891be.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_editNetworkDomain(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}editNetworkDomain":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_editNetworkDomain.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deleteNetworkDomain(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deleteNetworkDomain":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_deleteNetworkDomain.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deployVlan(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deployVlan":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_deployVlan.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_vlan_0e56433f_d808_4669_821d_812769517ff8(
self, method, url, body, headers
):
body = self.fixtures.load("network_vlan_0e56433f_d808_4669_821d_812769517ff8.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_editVlan(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}editVlan":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_editVlan.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deleteVlan(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deleteVlan":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_deleteVlan.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_expandVlan(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}expandVlan":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_expandVlan.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_addPublicIpBlock(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}addPublicIpBlock":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_addPublicIpBlock.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_publicIpBlock_4487241a_f0ca_11e3_9315_d4bed9b167ba(
self, method, url, body, headers
):
body = self.fixtures.load("network_publicIpBlock_4487241a_f0ca_11e3_9315_d4bed9b167ba.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_publicIpBlock(
self, method, url, body, headers
):
body = self.fixtures.load("network_publicIpBlock.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_publicIpBlock_9945dc4a_bdce_11e4_8c14_b8ca3a5d9ef8(
self, method, url, body, headers
):
body = self.fixtures.load("network_publicIpBlock_9945dc4a_bdce_11e4_8c14_b8ca3a5d9ef8.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_removePublicIpBlock(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}removePublicIpBlock":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_removePublicIpBlock.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_firewallRule(
self, method, url, body, headers
):
body = self.fixtures.load("network_firewallRule.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_createFirewallRule(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}createFirewallRule":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_createFirewallRule.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_firewallRule_d0a20f59_77b9_4f28_a63b_e58496b73a6c(
self, method, url, body, headers
):
body = self.fixtures.load("network_firewallRule_d0a20f59_77b9_4f28_a63b_e58496b73a6c.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_editFirewallRule(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}editFirewallRule":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_editFirewallRule.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deleteFirewallRule(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deleteFirewallRule":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_deleteFirewallRule.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_createNatRule(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}createNatRule":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_createNatRule.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_natRule(
self, method, url, body, headers
):
body = self.fixtures.load("network_natRule.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_natRule_2187a636_7ebb_49a1_a2ff_5d617f496dce(
self, method, url, body, headers
):
body = self.fixtures.load("network_natRule_2187a636_7ebb_49a1_a2ff_5d617f496dce.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deleteNatRule(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deleteNatRule":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("network_deleteNatRule.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_addNic(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}addNic":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_addNic.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_removeNic(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}removeNic":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_removeNic.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_disableServerMonitoring(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}disableServerMonitoring":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_disableServerMonitoring.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_enableServerMonitoring(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}enableServerMonitoring":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_enableServerMonitoring.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_changeServerMonitoringPlan(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}changeServerMonitoringPlan":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_changeServerMonitoringPlan.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_osImage(
self, method, url, body, headers
):
body = self.fixtures.load("image_osImage.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_osImage_c14b1a46_2428_44c1_9c1a_b20e6418d08c(
self, method, url, body, headers
):
body = self.fixtures.load("image_osImage_c14b1a46_2428_44c1_9c1a_b20e6418d08c.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_osImage_6b4fb0c7_a57b_4f58_b59c_9958f94f971a(
self, method, url, body, headers
):
body = self.fixtures.load("image_osImage_6b4fb0c7_a57b_4f58_b59c_9958f94f971a.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_osImage_5234e5c7_01de_4411_8b6e_baeb8d91cf5d(
self, method, url, body, headers
):
body = self.fixtures.load("image_osImage_BAD_REQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_osImage_2ffa36c8_1848_49eb_b4fa_9d908775f68c(
self, method, url, body, headers
):
body = self.fixtures.load("image_osImage_BAD_REQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_osImage_FAKE_IMAGE_ID(
self, method, url, body, headers
):
body = self.fixtures.load("image_osImage_BAD_REQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_customerImage(
self, method, url, body, headers
):
body = self.fixtures.load("image_customerImage.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_customerImage_5234e5c7_01de_4411_8b6e_baeb8d91cf5d(
self, method, url, body, headers
):
body = self.fixtures.load("image_customerImage_5234e5c7_01de_4411_8b6e_baeb8d91cf5d.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_customerImage_2ffa36c8_1848_49eb_b4fa_9d908775f68c(
self, method, url, body, headers
):
body = self.fixtures.load("image_customerImage_2ffa36c8_1848_49eb_b4fa_9d908775f68c.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_customerImage_FAKE_IMAGE_ID(
self, method, url, body, headers
):
body = self.fixtures.load("image_customerImage_BAD_REQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_reconfigureServer(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}reconfigureServer":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("server_reconfigureServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_cleanServer(
self, method, url, body, headers
):
body = self.fixtures.load("server_cleanServer.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_addDisk(
self, method, url, body, headers
):
body = self.fixtures.load("server_addDisk.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_changeDiskSpeed(
self, method, url, body, headers
):
body = self.fixtures.load("change_disk_speed.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_expandDisk(
self, method, url, body, headers
):
body = self.fixtures.load("change_disk_size.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_removeDisk(
self, method, url, body, headers
):
body = self.fixtures.load("server_removeDisk.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_editServerMetadata(
self, method, url, body, headers
):
body = self.fixtures.load("server_editServerMetadata.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_createTagKey(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}createTagKey":
raise InvalidRequestError(request.tag)
name = findtext(request, "name", TYPES_URN)
description = findtext(request, "description", TYPES_URN)
value_required = findtext(request, "valueRequired", TYPES_URN)
display_on_report = findtext(request, "displayOnReport", TYPES_URN)
if name is None:
raise ValueError("Name must have a value in the request")
if description is not None:
raise ValueError("Default description for a tag should be blank")
if value_required is None or value_required != "true":
raise ValueError("Default valueRequired should be true")
if display_on_report is None or display_on_report != "true":
raise ValueError("Default displayOnReport should be true")
body = self.fixtures.load("tag_createTagKey.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_createTagKey_ALLPARAMS(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}createTagKey":
raise InvalidRequestError(request.tag)
name = findtext(request, "name", TYPES_URN)
description = findtext(request, "description", TYPES_URN)
value_required = findtext(request, "valueRequired", TYPES_URN)
display_on_report = findtext(request, "displayOnReport", TYPES_URN)
if name is None:
raise ValueError("Name must have a value in the request")
if description is None:
raise ValueError("Description should have a value")
if value_required is None or value_required != "false":
raise ValueError("valueRequired should be false")
if display_on_report is None or display_on_report != "false":
raise ValueError("displayOnReport should be false")
body = self.fixtures.load("tag_createTagKey.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_createTagKey_BADREQUEST(
self, method, url, body, headers
):
body = self.fixtures.load("tag_createTagKey_BADREQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_tagKey(self, method, url, body, headers):
body = self.fixtures.load("tag_tagKey_list.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_tagKey_SINGLE(
self, method, url, body, headers
):
body = self.fixtures.load("tag_tagKey_list_SINGLE.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_tagKey_ALLFILTERS(
self, method, url, body, headers
):
(_, params) = url.split("?")
parameters = params.split("&")
for parameter in parameters:
(key, value) = parameter.split("=")
if key == "id":
assert value == "fake_id"
elif key == "name":
assert value == "fake_name"
elif key == "valueRequired":
assert value == "false"
elif key == "displayOnReport":
assert value == "false"
elif key == "pageSize":
assert value == "250"
else:
raise ValueError("Could not find in url parameters {}:{}".format(key, value))
body = self.fixtures.load("tag_tagKey_list.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_tagKey_d047c609_93d7_4bc5_8fc9_732c85840075(
self, method, url, body, headers
):
body = self.fixtures.load("tag_tagKey_5ab77f5f_5aa9_426f_8459_4eab34e03d54.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_tagKey_d047c609_93d7_4bc5_8fc9_732c85840075_NOEXIST(
self, method, url, body, headers
):
body = self.fixtures.load("tag_tagKey_5ab77f5f_5aa9_426f_8459_4eab34e03d54_BADREQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_editTagKey_NAME(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}editTagKey":
raise InvalidRequestError(request.tag)
name = findtext(request, "name", TYPES_URN)
description = findtext(request, "description", TYPES_URN)
value_required = findtext(request, "valueRequired", TYPES_URN)
display_on_report = findtext(request, "displayOnReport", TYPES_URN)
if name is None:
raise ValueError("Name must have a value in the request")
if description is not None:
raise ValueError("Description should be empty")
if value_required is not None:
raise ValueError("valueRequired should be empty")
if display_on_report is not None:
raise ValueError("displayOnReport should be empty")
body = self.fixtures.load("tag_editTagKey.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_editTagKey_NOTNAME(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}editTagKey":
raise InvalidRequestError(request.tag)
name = findtext(request, "name", TYPES_URN)
description = findtext(request, "description", TYPES_URN)
value_required = findtext(request, "valueRequired", TYPES_URN)
display_on_report = findtext(request, "displayOnReport", TYPES_URN)
if name is not None:
raise ValueError("Name should be empty")
if description is None:
raise ValueError("Description should not be empty")
if value_required is None:
raise ValueError("valueRequired should not be empty")
if display_on_report is None:
raise ValueError("displayOnReport should not be empty")
body = self.fixtures.load("tag_editTagKey.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_editTagKey_NOCHANGE(
self, method, url, body, headers
):
body = self.fixtures.load("tag_editTagKey_BADREQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_deleteTagKey(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}deleteTagKey":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("tag_deleteTagKey.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_deleteTagKey_NOEXIST(
self, method, url, body, headers
):
body = self.fixtures.load("tag_deleteTagKey_BADREQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_applyTags(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}applyTags":
raise InvalidRequestError(request.tag)
asset_type = findtext(request, "assetType", TYPES_URN)
asset_id = findtext(request, "assetId", TYPES_URN)
tag = request.find(fixxpath("tag", TYPES_URN))
tag_key_name = findtext(tag, "tagKeyName", TYPES_URN)
value = findtext(tag, "value", TYPES_URN)
if asset_type is None:
raise ValueError("assetType should not be empty")
if asset_id is None:
raise ValueError("assetId should not be empty")
if tag_key_name is None:
raise ValueError("tagKeyName should not be empty")
if value is None:
raise ValueError("value should not be empty")
body = self.fixtures.load("tag_applyTags.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_applyTags_NOVALUE(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}applyTags":
raise InvalidRequestError(request.tag)
asset_type = findtext(request, "assetType", TYPES_URN)
asset_id = findtext(request, "assetId", TYPES_URN)
tag = request.find(fixxpath("tag", TYPES_URN))
tag_key_name = findtext(tag, "tagKeyName", TYPES_URN)
value = findtext(tag, "value", TYPES_URN)
if asset_type is None:
raise ValueError("assetType should not be empty")
if asset_id is None:
raise ValueError("assetId should not be empty")
if tag_key_name is None:
raise ValueError("tagKeyName should not be empty")
if value is not None:
raise ValueError("value should be empty")
body = self.fixtures.load("tag_applyTags.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_applyTags_NOTAGKEY(
self, method, url, body, headers
):
body = self.fixtures.load("tag_applyTags_BADREQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_removeTags(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}removeTags":
raise InvalidRequestError(request.tag)
body = self.fixtures.load("tag_removeTag.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_removeTags_NOTAG(
self, method, url, body, headers
):
body = self.fixtures.load("tag_removeTag_BADREQUEST.xml")
return (httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_tag(self, method, url, body, headers):
body = self.fixtures.load("tag_tag_list.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_tag_tag_ALLPARAMS(
self, method, url, body, headers
):
(_, params) = url.split("?")
parameters = params.split("&")
for parameter in parameters:
(key, value) = parameter.split("=")
if key == "assetId":
assert value == "fake_asset_id"
elif key == "assetType":
assert value == "fake_asset_type"
elif key == "valueRequired":
assert value == "false"
elif key == "displayOnReport":
assert value == "false"
elif key == "pageSize":
assert value == "250"
elif key == "datacenterId":
assert value == "fake_location"
elif key == "value":
assert value == "fake_value"
elif key == "tagKeyName":
assert value == "fake_tag_key_name"
elif key == "tagKeyId":
assert value == "fake_tag_key_id"
else:
raise ValueError("Could not find in url parameters {}:{}".format(key, value))
body = self.fixtures.load("tag_tag_list.xml")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_ipAddressList(
self, method, url, body, headers
):
body = self.fixtures.load("ip_address_lists.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_ipAddressList_FILTERBYNAME(
self, method, url, body, headers
):
body = self.fixtures.load("ip_address_lists_FILTERBYNAME.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_createIpAddressList(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}" "createIpAddressList":
raise InvalidRequestError(request.tag)
net_domain = findtext(request, "networkDomainId", TYPES_URN)
if net_domain is None:
raise ValueError("Network Domain should not be empty")
name = findtext(request, "name", TYPES_URN)
if name is None:
raise ValueError("Name should not be empty")
ip_version = findtext(request, "ipVersion", TYPES_URN)
if ip_version is None:
raise ValueError("IP Version should not be empty")
ip_address_col_required = findall(request, "ipAddress", TYPES_URN)
child_ip_address_required = findall(request, "childIpAddressListId", TYPES_URN)
if 0 == len(ip_address_col_required) and 0 == len(child_ip_address_required):
raise ValueError(
"At least one ipAddress element or "
"one childIpAddressListId element must be "
"provided."
)
if ip_address_col_required[0].get("begin") is None:
raise ValueError("IP Address should not be empty")
body = self.fixtures.load("ip_address_list_create.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_editIpAddressList(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}" "editIpAddressList":
raise InvalidRequestError(request.tag)
ip_address_list = request.get("id")
if ip_address_list is None:
raise ValueError("IpAddressList ID should not be empty")
name = findtext(request, "name", TYPES_URN)
if name is not None:
raise ValueError("Name should not exists in request")
ip_version = findtext(request, "ipVersion", TYPES_URN)
if ip_version is not None:
raise ValueError("IP Version should not exists in request")
ip_address_col_required = findall(request, "ipAddress", TYPES_URN)
child_ip_address_required = findall(request, "childIpAddressListId", TYPES_URN)
if 0 == len(ip_address_col_required) and 0 == len(child_ip_address_required):
raise ValueError(
"At least one ipAddress element or "
"one childIpAddressListId element must be "
"provided."
)
if ip_address_col_required[0].get("begin") is None:
raise ValueError("IP Address should not be empty")
body = self.fixtures.load("ip_address_list_edit.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deleteIpAddressList(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}" "deleteIpAddressList":
raise InvalidRequestError(request.tag)
ip_address_list = request.get("id")
if ip_address_list is None:
raise ValueError("IpAddressList ID should not be empty")
body = self.fixtures.load("ip_address_list_delete.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_portList(
self, method, url, body, headers
):
body = self.fixtures.load("port_list_lists.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_portList_c8c92ea3_2da8_4d51_8153_f39bec794d69(
self, method, url, body, headers
):
body = self.fixtures.load("port_list_get.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_createPortList(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}" "createPortList":
raise InvalidRequestError(request.tag)
net_domain = findtext(request, "networkDomainId", TYPES_URN)
if net_domain is None:
raise ValueError("Network Domain should not be empty")
ports_required = findall(request, "port", TYPES_URN)
child_port_list_required = findall(request, "childPortListId", TYPES_URN)
if 0 == len(ports_required) and 0 == len(child_port_list_required):
raise ValueError(
"At least one port element or one " "childPortListId element must be provided"
)
if ports_required[0].get("begin") is None:
raise ValueError("PORT begin value should not be empty")
body = self.fixtures.load("port_list_create.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_editPortList(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}" "editPortList":
raise InvalidRequestError(request.tag)
ports_required = findall(request, "port", TYPES_URN)
child_port_list_required = findall(request, "childPortListId", TYPES_URN)
if 0 == len(ports_required) and 0 == len(child_port_list_required):
raise ValueError(
"At least one port element or one " "childPortListId element must be provided"
)
if ports_required[0].get("begin") is None:
raise ValueError("PORT begin value should not be empty")
body = self.fixtures.load("port_list_edit.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_network_deletePortList(
self, method, url, body, headers
):
request = ET.fromstring(body)
if request.tag != "{urn:didata.com:api:cloud:types}" "deletePortList":
raise InvalidRequestError(request.tag)
port_list = request.get("id")
if port_list is None:
raise ValueError("Port List ID should not be empty")
body = self.fixtures.load("ip_address_list_delete.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_cloneServer(
self, method, url, body, headers
):
body = self.fixtures.load("server_clone_response.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_image_importImage(
self, method, url, body, headers
):
body = self.fixtures.load("import_image_response.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_exchangeNicVlans(
self, method, url, body, headers
):
body = self.fixtures.load("exchange_nic_vlans_response.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_changeNetworkAdapter(
self, method, url, body, headers
):
body = self.fixtures.load("change_nic_networkadapter_response.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_deployUncustomizedServer(
self, method, url, body, headers
):
body = self.fixtures.load("deploy_customised_server.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_snapshot_snapshot(
self, method, url, body, headers
):
body = self.fixtures.load("list_server_snapshots.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_snapshot_enableSnapshotService(
self, method, url, body, headers
):
body = self.fixtures.load("enable_snapshot_service.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_snapshot_initiateManualSnapshot(
self, method, url, body, headers
):
body = self.fixtures.load("initiate_manual_snapshot.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_server_server_e1eb7d71_93c9_4b9c_807c_e05932dc8143(
self, method, url, body, headers
):
body = self.fixtures.load("manual_snapshot_server.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_snapshot_createSnapshotPreviewServer(
self, method, url, body, headers
):
body = self.fixtures.load("create_preview_server.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_snapshot_disableSnapshotService(
self, method, url, body, headers
):
body = self.fixtures.load("disable_server_snapshot_service.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_consistencyGroup_consistencyGroup(
self, method, url, body, headers
):
body = self.fixtures.load("list_consistency_groups.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_consistencyGroup_consistencyGroup_NET_DOMAIN(
self, method, url, body, headers
):
body = self.fixtures.load("cg_by_src_network_domain.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_consistencyGroup_consistencyGroup_CG_BY_NAME(
self, method, url, body, headers
):
body = self.fixtures.load("get_cg_by_name_or_id.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_consistencyGroup_consistencyGroup_195a426b_4559_4c79_849e_f22cdf2bfb6e(
self, method, url, body, headers
):
body = self.fixtures.load("get_cg_by_name_or_id.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_consistencyGroup_snapshot(
self, method, url, body, headers
):
body = self.fixtures.load("list_drs_snapshots.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_consistencyGroup_snapshot_MIN_MAX(
self, method, url, body, headers
):
body = self.fixtures.load("drs_snap_shots_by_min_max_time.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_consistencyGroup_snapshot_MIN(
self, method, url, body, headers
):
body = self.fixtures.load("drs_snap_shots_by_min.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_consistencyGroup_expandJournal(
self, method, url, body, headers
):
body = self.fixtures.load("drs_expand_journal.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_consistencyGroup_startPreviewSnapshot(
self, method, url, body, headers
):
body = self.fixtures.load("drs_start_failover_preview.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_consistencyGroup_stopPreviewSnapshot(
self, method, url, body, headers
):
body = self.fixtures.load("drs_stop_failover_preview.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_consistencyGroup_initiateFailover_INVALID_STATUS(
self, method, url, body, headers
):
body = self.fixtures.load("drs_invalid_status.xml")
return httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_consistencyGroup_initiateFailover(
self, method, url, body, headers
):
body = self.fixtures.load("drs_initiate_failover.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_consistencyGroup_createConsistencyGroup_FAIL_INELIGIBLE(
self, method, url, body, headers
):
body = self.fixtures.load("drs_fail_create_cg_ineligible.xml")
return httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_consistencyGroup_createConsistencyGroup_FAIL_NOT_SUPPORTED(
self, method, url, body, headers
):
body = self.fixtures.load("drs_fail_create_cg_not_supported.xml")
return httplib.BAD_REQUEST, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_consistencyGroup_createConsistencyGroup(
self, method, url, body, headers
):
body = self.fixtures.load("drs_create_cg.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
def _caas_2_7_8a8f6abc_2745_4d8a_9cbc_8dabe5a7d0e4_consistencyGroup_deleteConsistencyGroup(
self, method, url, body, headers
):
body = self.fixtures.load("drs_delete_consistency_group.xml")
return httplib.OK, body, {}, httplib.responses[httplib.OK]
if __name__ == "__main__":
sys.exit(unittest.main())