blob: 29dc7749d3dca72596f8ba23137f6ff6ee2623f9 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
try:
import simplejson as json
except Exception:
import json
from libcloud.utils.py3 import httplib
from libcloud.utils.py3 import assertRaisesRegex
from libcloud.common.types import InvalidCredsError
from libcloud.compute.drivers.cloudsigma import CloudSigmaNodeDriver
from libcloud.compute.drivers.cloudsigma import CloudSigma_2_0_NodeDriver
from libcloud.compute.drivers.cloudsigma import CloudSigmaError
from libcloud.compute.types import NodeState
from libcloud.test import unittest
from libcloud.test import MockHttp
from libcloud.test.file_fixtures import ComputeFileFixtures
class CloudSigmaAPI20BaseTestCase(object):
def setUp(self):
self.driver_klass.connectionCls.conn_class = CloudSigmaMockHttp
CloudSigmaMockHttp.type = None
CloudSigmaMockHttp.use_param = "do"
self.driver = self.driver_klass(*self.driver_args, **self.driver_kwargs)
self.driver.DRIVE_TRANSITION_SLEEP_INTERVAL = 0.1
self.driver.DRIVE_TRANSITION_TIMEOUT = 1
self.node = self.driver.list_nodes()[0]
def test_invalid_api_versions(self):
expected_msg = "Unsupported API version: invalid"
assertRaisesRegex(
self,
NotImplementedError,
expected_msg,
CloudSigmaNodeDriver,
"username",
"password",
api_version="invalid",
)
def test_invalid_credentials(self):
CloudSigmaMockHttp.type = "INVALID_CREDS"
self.assertRaises(InvalidCredsError, self.driver.list_nodes)
def test_invalid_region(self):
expected_msg = "Invalid region:"
assertRaisesRegex(
self,
ValueError,
expected_msg,
CloudSigma_2_0_NodeDriver,
"foo",
"bar",
region="invalid",
)
def test_list_sizes(self):
sizes = self.driver.list_sizes()
size = sizes[0]
self.assertEqual(size.id, "small-1")
def test_list_images(self):
images = self.driver.list_images()
image = images[0]
self.assertEqual(image.name, "ubuntu-10.04-toMP")
self.assertEqual(image.extra["image_type"], "preinst")
self.assertEqual(image.extra["media"], "disk")
self.assertEqual(image.extra["os"], "linux")
def test_list_nodes(self):
nodes = self.driver.list_nodes()
node = nodes[0]
self.assertEqual(len(nodes), 2)
self.assertEqual(node.id, "9de75ed6-fd33-45e2-963f-d405f31fd911")
self.assertEqual(node.name, "test no drives")
self.assertEqual(node.state, NodeState.RUNNING)
self.assertEqual(node.public_ips, ["185.12.5.181", "178.22.68.55"])
self.assertEqual(node.private_ips, [])
def test_create_node(self):
image = self.driver.list_images()[0]
size = self.driver.list_sizes()[0]
metadata = {"foo": "bar"}
node = self.driver.create_node(
name="test node", size=size, image=image, ex_metadata=metadata
)
self.assertEqual(node.name, "test node")
self.assertEqual(len(node.extra["nics"]), 1)
self.assertEqual(node.extra["nics"][0]["ip_v4_conf"]["conf"], "dhcp")
def test_create_node_with_vlan(self):
image = self.driver.list_images()[0]
size = self.driver.list_sizes()[0]
vlan_uuid = "39ae851d-433f-4ac2-a803-ffa24cb1fa3e"
node = self.driver.create_node(
name="test node vlan", size=size, image=image, ex_vlan=vlan_uuid
)
self.assertEqual(node.name, "test node vlan")
self.assertEqual(len(node.extra["nics"]), 2)
self.assertEqual(node.extra["nics"][0]["ip_v4_conf"]["conf"], "dhcp")
self.assertEqual(node.extra["nics"][1]["vlan"]["uuid"], vlan_uuid)
def test_destroy_node(self):
status = self.driver.destroy_node(node=self.node)
self.assertTrue(status)
def test_ex_start_node(self):
status = self.driver.ex_start_node(node=self.node)
self.assertTrue(status)
def test_ex_start_node_avoid_mode(self):
CloudSigmaMockHttp.type = "AVOID_MODE"
ex_avoid = ["1", "2"]
status = self.driver.ex_start_node(node=self.node, ex_avoid=ex_avoid)
self.assertTrue(status)
def test_ex_start_node_already_started(self):
CloudSigmaMockHttp.type = "ALREADY_STARTED"
expected_msg = (
'Cannot start guest in state "started". Guest should '
'be in state "stopped'
)
assertRaisesRegex(
self,
CloudSigmaError,
expected_msg,
self.driver.ex_start_node,
node=self.node,
)
def test_ex_stop_node(self):
status = self.driver.ex_stop_node(node=self.node)
self.assertTrue(status)
def test_ex_stop_node_already_stopped(self):
CloudSigmaMockHttp.type = "ALREADY_STOPPED"
expected_msg = 'Cannot stop guest in state "stopped"'
assertRaisesRegex(
self,
CloudSigmaError,
expected_msg,
self.driver.ex_stop_node,
node=self.node,
)
def test_reboot_node(self):
status = self.driver.reboot_node(node=self.node)
self.assertTrue(status)
def test_ex_get_node(self):
node_id = "3df825cb-9c1b-470d-acbd-03e1a966c046"
node = self.driver.ex_get_node(node_id)
self.assertEqual(node.id, "3df825cb-9c1b-470d-acbd-03e1a966c046")
self.assertEqual(node.name, "test_server")
self.assertEqual(node.state, NodeState.RUNNING)
self.assertEqual(node.public_ips, ["162.213.38.202"])
self.assertEqual(node.private_ips, [])
def test_ex_clone_node(self):
node_to_clone = self.driver.list_nodes()[0]
cloned_node = self.driver.ex_clone_node(
node=node_to_clone, name="test cloned node"
)
self.assertEqual(cloned_node.name, "test cloned node")
def test_ex_open_vnc_tunnel(self):
node = self.driver.list_nodes()[0]
vnc_url = self.driver.ex_open_vnc_tunnel(node=node)
self.assertEqual(vnc_url, "vnc://direct.lvs.cloudsigma.com:41111")
def test_ex_close_vnc_tunnel(self):
node = self.driver.list_nodes()[0]
status = self.driver.ex_close_vnc_tunnel(node=node)
self.assertTrue(status)
def test_ex_list_library_drives(self):
drives = self.driver.ex_list_library_drives()
drive = drives[0]
self.assertEqual(drive.name, "IPCop 2.0.2")
self.assertEqual(drive.size, 1)
self.assertEqual(drive.media, "cdrom")
self.assertEqual(drive.status, "unmounted")
def test_ex_list_user_drives(self):
drives = self.driver.ex_list_user_drives()
drive = drives[0]
self.assertEqual(drive.name, "test node 2-drive")
self.assertEqual(drive.size, 13)
self.assertEqual(drive.media, "disk")
self.assertEqual(drive.status, "unmounted")
def test_ex_create_drive(self):
CloudSigmaMockHttp.type = "CREATE"
name = "test drive 5"
size = 2000 * 1024 * 1024
drive = self.driver.ex_create_drive(name=name, size=size, media="disk")
self.assertEqual(drive.name, "test drive 5")
self.assertEqual(drive.media, "disk")
def test_ex_clone_drive(self):
drive = self.driver.ex_list_user_drives()[0]
cloned_drive = self.driver.ex_clone_drive(drive=drive, name="cloned drive")
self.assertEqual(cloned_drive.name, "cloned drive")
def test_ex_resize_drive(self):
drive = self.driver.ex_list_user_drives()[0]
size = 10
resized_drive = self.driver.ex_resize_drive(drive=drive, size=size)
self.assertEqual(resized_drive.name, "test drive 5")
self.assertEqual(resized_drive.media, "disk")
self.assertEqual(resized_drive.size, size)
def test_ex_list_firewall_policies(self):
policies = self.driver.ex_list_firewall_policies()
policy = policies[1]
rule = policy.rules[0]
self.assertEqual(policy.name, "My awesome policy")
self.assertEqual(rule.action, "drop")
self.assertEqual(rule.direction, "out")
self.assertEqual(rule.dst_ip, "23.0.0.0/32")
self.assertEqual(rule.ip_proto, "tcp")
self.assertIsNone(rule.dst_port)
self.assertIsNone(rule.src_ip)
self.assertIsNone(rule.src_port)
self.assertEqual(
rule.comment, "Drop traffic from the VM to IP address 23.0.0.0/32"
)
def test_ex_create_firewall_policy_no_rules(self):
CloudSigmaMockHttp.type = "CREATE_NO_RULES"
policy = self.driver.ex_create_firewall_policy(name="test policy 1")
self.assertEqual(policy.name, "test policy 1")
self.assertEqual(policy.rules, [])
def test_ex_create_firewall_policy_with_rules(self):
CloudSigmaMockHttp.type = "CREATE_WITH_RULES"
rules = [
{
"action": "accept",
"direction": "out",
"ip_proto": "tcp",
"src_ip": "127.0.0.1",
"dst_ip": "127.0.0.1",
}
]
policy = self.driver.ex_create_firewall_policy(
name="test policy 2", rules=rules
)
rule = policy.rules[0]
self.assertEqual(policy.name, "test policy 2")
self.assertEqual(len(policy.rules), 1)
self.assertEqual(rule.action, "accept")
self.assertEqual(rule.direction, "out")
self.assertEqual(rule.ip_proto, "tcp")
def test_ex_attach_firewall_policy(self):
policy = self.driver.ex_list_firewall_policies()[0]
node = self.driver.list_nodes()[0]
CloudSigmaMockHttp.type = "ATTACH_POLICY"
updated_node = self.driver.ex_attach_firewall_policy(policy=policy, node=node)
nic = updated_node.extra["nics"][0]
self.assertEqual(
nic["firewall_policy"]["uuid"], "461dfb8c-e641-43d7-a20e-32e2aa399086"
)
def test_ex_attach_firewall_policy_inexistent_nic(self):
policy = self.driver.ex_list_firewall_policies()[0]
node = self.driver.list_nodes()[0]
nic_mac = "inexistent"
expected_msg = "Cannot find the NIC interface to attach a policy to"
assertRaisesRegex(
self,
ValueError,
expected_msg,
self.driver.ex_attach_firewall_policy,
policy=policy,
node=node,
nic_mac=nic_mac,
)
def test_ex_delete_firewall_policy(self):
policy = self.driver.ex_list_firewall_policies()[0]
status = self.driver.ex_delete_firewall_policy(policy=policy)
self.assertTrue(status)
def test_ex_list_tags(self):
tags = self.driver.ex_list_tags()
tag = tags[0]
self.assertEqual(tag.id, "a010ec41-2ead-4630-a1d0-237fa77e4d4d")
self.assertEqual(tag.name, "test tag 2")
def test_ex_get_tag(self):
tag = self.driver.ex_get_tag(tag_id="a010ec41-2ead-4630-a1d0-237fa77e4d4d")
self.assertEqual(tag.id, "a010ec41-2ead-4630-a1d0-237fa77e4d4d")
self.assertEqual(tag.name, "test tag 2")
def test_ex_create_tag(self):
tag = self.driver.ex_create_tag(name="test tag 3")
self.assertEqual(tag.name, "test tag 3")
def test_ex_create_tag_with_resources(self):
CloudSigmaMockHttp.type = "WITH_RESOURCES"
resource_uuids = ["1"]
tag = self.driver.ex_create_tag(
name="test tag 3", resource_uuids=resource_uuids
)
self.assertEqual(tag.name, "test tag 3")
self.assertEqual(tag.resources, resource_uuids)
def test_ex_tag_resource(self):
node = self.driver.list_nodes()[0]
tag = self.driver.ex_list_tags()[0]
updated_tag = self.driver.ex_tag_resource(resource=node, tag=tag)
self.assertEqual(updated_tag.name, "test tag 3")
def test_ex_tag_resources(self):
nodes = self.driver.list_nodes()
tag = self.driver.ex_list_tags()[0]
updated_tag = self.driver.ex_tag_resources(resources=nodes, tag=tag)
self.assertEqual(updated_tag.name, "test tag 3")
def test_ex_tag_resource_invalid_resource_object(self):
tag = self.driver.ex_list_tags()[0]
expected_msg = "Resource doesn't have id attribute"
assertRaisesRegex(
self,
ValueError,
expected_msg,
self.driver.ex_tag_resource,
tag=tag,
resource={},
)
def test_ex_delete_tag(self):
tag = self.driver.ex_list_tags()[0]
status = self.driver.ex_delete_tag(tag=tag)
self.assertTrue(status)
def test_ex_get_balance(self):
balance = self.driver.ex_get_balance()
self.assertEqual(balance["balance"], "10.00")
self.assertEqual(balance["currency"], "USD")
def test_ex_get_pricing(self):
pricing = self.driver.ex_get_pricing()
self.assertTrue("current" in pricing)
self.assertTrue("next" in pricing)
self.assertTrue("objects" in pricing)
def test_ex_get_usage(self):
pricing = self.driver.ex_get_usage()
self.assertTrue("balance" in pricing)
self.assertTrue("usage" in pricing)
def test_ex_list_subscriptions(self):
subscriptions = self.driver.ex_list_subscriptions()
self.assertEqual(len(subscriptions), 6)
subscription = subscriptions[0]
self.assertEqual(subscription.id, "7272")
self.assertEqual(subscription.resource, "vlan")
self.assertEqual(subscription.amount, 1)
self.assertEqual(subscription.period, "345 days, 0:00:00")
self.assertEqual(subscription.status, "active")
self.assertEqual(subscription.price, "0E-20")
subscription = subscriptions[-1]
self.assertEqual(subscription.id, "5555")
self.assertEqual(subscription.start_time, None)
self.assertEqual(subscription.end_time, None)
def test_ex_create_subscription(self):
CloudSigmaMockHttp.type = "CREATE_SUBSCRIPTION"
subscription = self.driver.ex_create_subscription(
amount=1, period="1 month", resource="vlan"
)
self.assertEqual(subscription.amount, 1)
self.assertEqual(subscription.period, "1 month")
self.assertEqual(subscription.resource, "vlan")
self.assertEqual(subscription.price, "10.26666666666666666666666667")
self.assertEqual(subscription.auto_renew, False)
self.assertEqual(
subscription.subscribed_object, "2494079f-8376-40bf-9b37-34d633b8a7b7"
)
def test_ex_list_subscriptions_status_filterting(self):
CloudSigmaMockHttp.type = "STATUS_FILTER"
self.driver.ex_list_subscriptions(status="active")
def test_ex_list_subscriptions_resource_filterting(self):
CloudSigmaMockHttp.type = "RESOURCE_FILTER"
resources = ["cpu", "mem"]
self.driver.ex_list_subscriptions(resources=resources)
def test_ex_toggle_subscription_auto_renew(self):
subscription = self.driver.ex_list_subscriptions()[0]
status = self.driver.ex_toggle_subscription_auto_renew(
subscription=subscription
)
self.assertTrue(status)
def test_ex_list_capabilities(self):
capabilities = self.driver.ex_list_capabilities()
self.assertEqual(capabilities["servers"]["cpu"]["min"], 250)
def test_ex_list_servers_availability_groups(self):
groups = self.driver.ex_list_servers_availability_groups()
self.assertEqual(len(groups), 3)
self.assertEqual(len(groups[0]), 2)
self.assertEqual(len(groups[2]), 1)
def test_ex_list_drives_availability_groups(self):
groups = self.driver.ex_list_drives_availability_groups()
self.assertEqual(len(groups), 1)
self.assertEqual(len(groups[0]), 11)
def test_ex_attach_drive(self):
node_id = "3df825cb-9c1b-470d-acbd-03e1a966c046"
node = self.driver.ex_get_node(node_id)
drives = self.driver.ex_list_user_drives()
drive = drives[0]
response = self.driver.ex_attach_drive(node, drive)
self.assertTrue(response)
def test_ex_detach_drive(self):
node_id = "3df825cb-9c1b-470d-acbd-03e1a966c046"
node = self.driver.ex_get_node(node_id)
drives = self.driver.ex_list_user_drives()
drive = drives[0]
response = self.driver.ex_detach_drive(node, drive)
self.assertTrue(response)
def test_wait_for_drive_state_transition_timeout(self):
drive = self.driver.ex_list_user_drives()[0]
state = "timeout"
expected_msg = "Timed out while waiting for drive transition"
assertRaisesRegex(
self,
Exception,
expected_msg,
self.driver._wait_for_drive_state_transition,
drive=drive,
state=state,
timeout=0.5,
)
def test_wait_for_drive_state_transition_success(self):
drive = self.driver.ex_list_user_drives()[0]
state = "unmounted"
drive = self.driver._wait_for_drive_state_transition(
drive=drive, state=state, timeout=0.5
)
self.assertEqual(drive.status, state)
def test_list_key_pairs(self):
keys = self.driver.list_key_pairs()
key = keys[0]
self.assertEqual(len(keys), 2)
self.assertEqual(key.name, "ala")
self.assertEqual(
key.fingerprint, "8c:71:a2:f0:bc:a4:45:66:a8:59:77:6d:80:d5:a0:89"
)
self.assertEqual(key.extra["uuid"], "186106ac-afb5-40e5-a0de-6f0feba5a3d5")
def test_import_key_pair_from_string(self):
public_key = self.driver.list_key_pairs()[0].public_key
key = self.driver.import_key_pair_from_string("test_key", public_key)
self.assertEqual(key.name, "test_key")
self.assertIsNone(key.private_key)
self.assertEqual(key.extra["uuid"], "6d7db550-d1c5-4e3b-9c0a-e83f6a2b8244")
self.assertEqual(key.public_key, public_key)
def test_get_key_pair(self):
uuid = self.driver.list_key_pairs()[0].extra["uuid"]
key = self.driver.get_key_pair(uuid)
self.assertEqual(key.name, "ala")
self.assertEqual(key.extra["uuid"], "186106ac-afb5-40e5-a0de-6f0feba5a3d5")
def test_delete_key_pair(self):
key = self.driver.list_key_pairs()[0]
response = self.driver.delete_key_pair(key)
self.assertTrue(response)
class CloudSigmaAPI20DirectTestCase(CloudSigmaAPI20BaseTestCase, unittest.TestCase):
driver_klass = CloudSigma_2_0_NodeDriver
driver_args = ("foo", "bar")
driver_kwargs = {}
class CloudSigmaAPI20IndirectTestCase(CloudSigmaAPI20BaseTestCase, unittest.TestCase):
driver_klass = CloudSigmaNodeDriver
driver_args = ("foo", "bar")
driver_kwargs = {"api_version": "2.0"}
class CloudSigmaMockHttp(MockHttp, unittest.TestCase):
fixtures = ComputeFileFixtures("cloudsigma_2_0")
def _api_2_0_servers_detail_INVALID_CREDS(self, method, url, body, headers):
body = self.fixtures.load("libdrives.json")
return (httplib.UNAUTHORIZED, body, {}, httplib.responses[httplib.UNAUTHORIZED])
def _api_2_0_libdrives(self, method, url, body, headers):
body = self.fixtures.load("libdrives.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_servers_detail(self, method, url, body, headers):
body = self.fixtures.load("servers_detail_mixed_state.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_servers_9de75ed6_fd33_45e2_963f_d405f31fd911(
self, method, url, body, headers
):
body = ""
return (httplib.NO_CONTENT, body, {}, httplib.responses[httplib.NO_CONTENT])
def _api_2_0_servers(self, method, url, body, headers):
if method == "POST":
# create_node
parsed = json.loads(body)
if "vlan" in parsed["name"]:
self.assertEqual(len(parsed["nics"]), 2)
body = self.fixtures.load("servers_create_with_vlan.json")
else:
body = self.fixtures.load("servers_create.json")
return (httplib.CREATED, body, {}, httplib.responses[httplib.CREATED])
def _api_2_0_servers_9de75ed6_fd33_45e2_963f_d405f31fd911_action_start(
self, method, url, body, headers
):
body = self.fixtures.load("start_success.json")
return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED])
def _api_2_0_servers_9de75ed6_fd33_45e2_963f_d405f31fd911_action_AVOID_MODE_start(
self, method, url, body, headers
):
self.assertUrlContainsQueryParams(url, {"avoid": "1,2"})
body = self.fixtures.load("start_success.json")
return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED])
def _api_2_0_servers_9de75ed6_fd33_45e2_963f_d405f31fd911_action_ALREADY_STARTED_start(
self, method, url, body, headers
):
body = self.fixtures.load("start_already_started.json")
return (httplib.FORBIDDEN, body, {}, httplib.responses[httplib.FORBIDDEN])
def _api_2_0_servers_9de75ed6_fd33_45e2_963f_d405f31fd911_action_stop(
self, method, url, body, headers
):
body = self.fixtures.load("stop_success.json")
return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED])
def _api_2_0_servers_9de75ed6_fd33_45e2_963f_d405f31fd911_action_ALREADY_STOPPED_stop(
self, method, url, body, headers
):
body = self.fixtures.load("stop_already_stopped.json")
return (httplib.FORBIDDEN, body, {}, httplib.responses[httplib.FORBIDDEN])
def _api_2_0_servers_9de75ed6_fd33_45e2_963f_d405f31fd911_action_clone(
self, method, url, body, headers
):
body = self.fixtures.load("servers_clone.json")
return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED])
def _api_2_0_servers_9de75ed6_fd33_45e2_963f_d405f31fd911_action_open_vnc(
self, method, url, body, headers
):
body = self.fixtures.load("servers_open_vnc.json")
return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED])
def _api_2_0_servers_9de75ed6_fd33_45e2_963f_d405f31fd911_action_close_vnc(
self, method, url, body, headers
):
body = self.fixtures.load("servers_close_vnc.json")
return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED])
def _api_2_0_servers_3df825cb_9c1b_470d_acbd_03e1a966c046(
self, method, url, body, headers
):
if method == "GET":
body = self.fixtures.load("servers_get.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
elif method == "PUT":
body = ""
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_drives_detail(self, method, url, body, headers):
body = self.fixtures.load("drives_detail.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_drives_b02311e2_a83c_4c12_af10_b30d51c86913(
self, method, url, body, headers
):
body = self.fixtures.load("drives_get.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_drives_9d1d2cf3_08c1_462f_8485_f4b073560809(
self, method, url, body, headers
):
body = self.fixtures.load("drives_get.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_drives_CREATE(self, method, url, body, headers):
body = self.fixtures.load("drives_create.json")
return (httplib.CREATED, body, {}, httplib.responses[httplib.CREATED])
def _api_2_0_drives_9d1d2cf3_08c1_462f_8485_f4b073560809_action_clone(
self, method, url, body, headers
):
body = self.fixtures.load("drives_clone.json")
return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED])
def _api_2_0_drives_5236b9ee_f735_42fd_a236_17558f9e12d3_action_clone(
self, method, url, body, headers
):
body = self.fixtures.load("drives_clone.json")
return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED])
def _api_2_0_drives_b02311e2_a83c_4c12_af10_b30d51c86913_action_resize(
self, method, url, body, headers
):
body = self.fixtures.load("drives_resize.json")
return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED])
def _api_2_0_drives_9d1d2cf3_08c1_462f_8485_f4b073560809_action_resize(
self, method, url, body, headers
):
body = self.fixtures.load("drives_resize.json")
return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED])
def _api_2_0_fwpolicies_detail(self, method, url, body, headers):
body = self.fixtures.load("fwpolicies_detail.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_fwpolicies_CREATE_NO_RULES(self, method, url, body, headers):
body = self.fixtures.load("fwpolicies_create_no_rules.json")
return (httplib.CREATED, body, {}, httplib.responses[httplib.CREATED])
def _api_2_0_fwpolicies_CREATE_WITH_RULES(self, method, url, body, headers):
body = self.fixtures.load("fwpolicies_create_with_rules.json")
return (httplib.CREATED, body, {}, httplib.responses[httplib.CREATED])
def _api_2_0_servers_9de75ed6_fd33_45e2_963f_d405f31fd911_ATTACH_POLICY(
self, method, url, body, headers
):
body = self.fixtures.load("servers_attach_policy.json")
return (httplib.CREATED, body, {}, httplib.responses[httplib.CREATED])
def _api_2_0_fwpolicies_0e339282_0cb5_41ac_a9db_727fb62ff2dc(
self, method, url, body, headers
):
if method == "DELETE":
body = ""
return (httplib.NO_CONTENT, body, {}, httplib.responses[httplib.NO_CONTENT])
def _api_2_0_tags_detail(self, method, url, body, headers):
body = self.fixtures.load("tags_detail.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_tags(self, method, url, body, headers):
if method == "POST":
body = self.fixtures.load("tags_create.json")
return (httplib.CREATED, body, {}, httplib.responses[httplib.CREATED])
def _api_2_0_tags_WITH_RESOURCES(self, method, url, body, headers):
if method == "POST":
body = self.fixtures.load("tags_create_with_resources.json")
return (httplib.CREATED, body, {}, httplib.responses[httplib.CREATED])
def _api_2_0_tags_a010ec41_2ead_4630_a1d0_237fa77e4d4d(
self, method, url, body, headers
):
if method == "GET":
# ex_get_tag
body = self.fixtures.load("tags_get.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
elif method == "PUT":
# ex_tag_resource
body = self.fixtures.load("tags_update.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
elif method == "DELETE":
# ex_delete_tag
body = ""
return (httplib.NO_CONTENT, body, {}, httplib.responses[httplib.NO_CONTENT])
def _api_2_0_balance(self, method, url, body, headers):
body = self.fixtures.load("balance.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_pricing(self, method, url, body, headers):
body = self.fixtures.load("pricing.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_currentusage(self, method, url, body, headers):
body = self.fixtures.load("currentusage.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_subscriptions(self, method, url, body, headers):
body = self.fixtures.load("subscriptions.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_subscriptions_STATUS_FILTER(self, method, url, body, headers):
self.assertUrlContainsQueryParams(url, {"status": "active"})
body = self.fixtures.load("subscriptions.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_subscriptions_RESOURCE_FILTER(self, method, url, body, headers):
expected_params = {"resource": "cpu,mem", "status": "all"}
self.assertUrlContainsQueryParams(url, expected_params)
body = self.fixtures.load("subscriptions.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_subscriptions_7272_action_auto_renew(self, method, url, body, headers):
body = ""
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_subscriptions_CREATE_SUBSCRIPTION(self, method, url, body, headers):
body = self.fixtures.load("create_subscription.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_capabilities(self, method, url, body, headers):
body = self.fixtures.load("capabilities.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_servers_availability_groups(self, method, url, body, headers):
body = self.fixtures.load("servers_avail_groups.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_drives_availability_groups(self, method, url, body, headers):
body = self.fixtures.load("drives_avail_groups.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def _api_2_0_keypairs(self, method, url, body, headers):
if method == "GET":
body = self.fixtures.load("keypairs_list.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
elif method == "POST":
body = self.fixtures.load("keypairs_import.json")
return (httplib.CREATED, body, {}, httplib.responses[httplib.CREATED])
def _api_2_0_keypairs_186106ac_afb5_40e5_a0de_6f0feba5a3d5(
self, method, url, body, headers
):
if method == "GET":
body = self.fixtures.load("keypairs_get.json")
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
if method == "DELETE":
body = ""
return (httplib.NO_CONTENT, body, {}, httplib.responses[httplib.NO_CONTENT])
if __name__ == "__main__":
sys.exit(unittest.main())