| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| import sys |
| |
| from libcloud.test import MockHttp, unittest |
| from libcloud.utils.py3 import httplib |
| from libcloud.compute.base import NodeImage, NodeLocation, NodeAuthPassword |
| from libcloud.test.secrets import ONEANDONE_PARAMS |
| from libcloud.compute.types import Provider, NodeState |
| from libcloud.compute.providers import get_driver |
| from libcloud.test.file_fixtures import ComputeFileFixtures |
| |
| |
| class OneAndOneTests(unittest.TestCase): |
| def setUp(self): |
| oneAndOne = get_driver(Provider.ONEANDONE) |
| oneAndOne.connectionCls.conn_class = OneAndOneMockHttp |
| self.driver = oneAndOne(ONEANDONE_PARAMS) |
| |
| """ |
| Function tests for listing items |
| """ |
| |
| def test_list_sizes(self): |
| sizes = self.driver.list_sizes() |
| self.assertEqual(len(sizes), 4) |
| |
| def test_list_locations(self): |
| sizes = self.driver.list_locations() |
| self.assertEqual(len(sizes), 4) |
| |
| def test_list_images(self): |
| appliances = self.driver.list_images("IMAGE") |
| self.assertEqual(len(appliances), 102) |
| |
| def test_get_image(self): |
| appliance = self.driver.get_image("img_1") |
| self.assertNotEqual(appliance, None) |
| |
| def test_list_nodes(self): |
| nodes = self.driver.list_nodes() |
| self.assertEqual(len(nodes), 5) |
| |
| counter = 0 |
| |
| for node in nodes: |
| if counter == 0: |
| self.assertEqual(node.id, "8A7D5122BDC173B6E52223878CEF2748") |
| self.assertEqual(node.name, "Docs Content Ubuntu 16.04-1") |
| self.assertEqual(node.state, NodeState.RUNNING) |
| self.assertEqual(node.public_ips, ["50.21.182.126"]) |
| self.assertEqual(node.private_ips, []) |
| if counter == 1: |
| self.assertEqual(node.id, "E7D36EC025C73796035BF4F171379025") |
| self.assertEqual(node.name, "Docs Content Test Server: CentOS 7-1") |
| self.assertEqual(node.state, NodeState.RUNNING) |
| self.assertEqual(node.public_ips, ["62.151.179.99"]) |
| self.assertEqual(node.private_ips, []) |
| if counter == 2: |
| self.assertEqual(node.id, "DDDC4CCA34AAB08132FA1E40F9FEAC25") |
| self.assertEqual(node.name, "App Dev Server 5") |
| self.assertEqual(node.state, NodeState.RUNNING) |
| self.assertEqual(node.public_ips, ["70.35.206.196"]) |
| self.assertEqual(node.private_ips, []) |
| if counter == 3: |
| self.assertEqual(node.id, "D5C5C1D01249DE9B88BE3DAE973AA090") |
| self.assertEqual(node.name, "Docs Test Server: CentOS 7-2") |
| self.assertEqual(node.state, NodeState.RUNNING) |
| self.assertEqual(node.public_ips, ["74.208.88.88"]) |
| self.assertEqual(node.private_ips, []) |
| if counter == 4: |
| self.assertEqual(node.id, "FB1765588A90364835782061CE48BA8E") |
| self.assertEqual(node.name, "Docs Content Test Server Ubuntu 16.04-2") |
| self.assertEqual(node.state, NodeState.RUNNING) |
| self.assertEqual(node.public_ips, ["70.35.206.233"]) |
| self.assertEqual(node.private_ips, []) |
| counter += 1 |
| |
| def test_create_node(self): |
| node = self.driver.create_node( |
| name="name", |
| image=NodeImage(id="image_id", name=None, driver=self.driver), |
| ex_fixed_instance_size_id="instance_id", |
| location=NodeLocation("datacenter_id", name="name", country="GB", driver=self.driver), |
| auth=NodeAuthPassword("password"), |
| ex_ip="1.1.1.1", |
| ex_monitoring_policy_id="mp_id", |
| ex_firewall_policy_id="fw_id", |
| ex_loadbalancer_id="lb_id", |
| ex_description="description", |
| ex_power_on="SHUTDOWN", |
| ) |
| |
| self.assertEqual(node.id, "E7D36EC025C73796035BF4F171379025") |
| self.assertEqual(node.name, "Docs Content Test Server: CentOS 7-1") |
| self.assertEqual(node.extra["description"], "My server description") |
| |
| self.assertIsNone(node.extra["status"]["percent"]) |
| self.assertEqual(node.extra["status"]["state"], "POWERED_ON") |
| |
| self.assertEqual(node.extra["image"]["id"], "B5F778B85C041347BCDCFC3172AB3F3C") |
| self.assertEqual(node.extra["image"]["name"], "centos7-64std") |
| |
| self.assertEqual(node.extra["datacenter"]["id"], "908DC2072407C94C8054610AD5A53B8C") |
| self.assertEqual(node.extra["datacenter"]["country_code"], "US") |
| self.assertEqual(node.extra["datacenter"]["location"], "United States of America") |
| |
| self.assertEqual( |
| node.extra["hardware"]["fixed_instance_size_id"], |
| "65929629F35BBFBA63022008F773F3EB", |
| ) |
| self.assertEqual(node.extra["hardware"]["vcore"], 1) |
| self.assertEqual( |
| node.extra["hardware"]["hdds"][0]["id"], "CDB278D95A92CB4C379A9CAAD6759F02" |
| ) |
| self.assertEqual(node.extra["hardware"]["hdds"][0]["size"], 40) |
| self.assertEqual(node.extra["hardware"]["hdds"][0]["is_main"], True) |
| self.assertEqual(node.extra["hardware"]["cores_per_processor"], 1) |
| self.assertEqual(node.extra["hardware"]["vcore"], 1) |
| self.assertEqual(node.extra["hardware"]["ram"], 1) |
| |
| self.assertEqual(node.extra["ips"][0]["id"], "FDBE99EDD57F8596CBF71B6B64BD0A92") |
| self.assertEqual(node.extra["ips"][0]["ip"], "62.151.179.99") |
| |
| self.assertIsNone(node.extra["monitoring_policy"]) |
| self.assertEqual(node.extra["alerts"], []) |
| self.assertIsNone(node.extra["snapshot"]) |
| self.assertIsNone(node.extra["dvd"]) |
| self.assertIsNone(node.extra["private_networks"]) |
| |
| def test_ex_list_datacenters(self): |
| datacenters = self.driver.ex_list_datacenters() |
| |
| self.assertEqual(len(datacenters), 4) |
| |
| def test_ex_shutdown_server(self): |
| server = self.driver.ex_shutdown_server("srv_1") |
| self.assertNotEqual(server, None) |
| |
| def test_reboot_node(self): |
| node = self.driver.ex_get_server("srv_1") |
| node = self.driver.reboot_node(node) |
| self.assertNotEqual(node, None) |
| |
| def test_ex_get_server(self): |
| server = self.driver.ex_get_server("srv_1") |
| self.assertNotEqual(server, None) |
| |
| def test_destroy_node(self): |
| server = self.driver.ex_get_server("srv_1") |
| |
| node = self.driver.destroy_node(server) |
| self.assertNotEqual(node, None) |
| |
| def test_get_server_hardware(self): |
| hardware = self.driver.ex_get_server_hardware("srv_1") |
| self.assertNotEqual(hardware, None) |
| self.assertEqual(hardware["vcore"], 1) |
| self.assertEqual(hardware["cores_per_processor"], 1) |
| self.assertEqual(hardware["ram"], 2) |
| self.assertEqual(hardware["fixed_instance_size_id"], 0) |
| self.assertNotEqual(hardware["hdds"], None) |
| self.assertEqual(hardware["hdds"][0]["id"], "8C626C1A7005D0D1F527143C413D461E") |
| self.assertEqual(hardware["hdds"][0]["is_main"], True) |
| self.assertEqual(hardware["hdds"][0]["size"], 40) |
| |
| def test_rename_server(self): |
| server = self.driver.ex_rename_server("srv_1", "name") |
| self.assertNotEqual(server, None) |
| |
| def test_ex_modify_server_hardware(self): |
| node = self.driver.ex_modify_server_hardware("srv_1", vcore=1) |
| self.assertNotEqual(node, None) |
| |
| def test_add_hdd(self): |
| node = self.driver.ex_add_hdd("srv_1", 1, True) |
| self.assertNotEqual(node, None) |
| |
| def test_modify_hdd(self): |
| node = self.driver.ex_modify_server_hardware("srv_1", "hdd_id", 50) |
| self.assertNotEqual(node, None) |
| |
| def test_remove_hdd(self): |
| node = self.driver.ex_remove_hdd("srv_1", "hdd_id") |
| self.assertNotEqual(node, None) |
| |
| def test_ex_get_server_image(self): |
| image = self.driver.ex_get_server_image("srv_1") |
| self.assertNotEqual(image, None) |
| self.assertEqual(image["id"], "76EBF29C1250167C8754B2B3D1C05F68") |
| self.assertEqual(image["name"], "centos7-64std") |
| |
| def test_ex_reinstall_server_image(self): |
| node = self.driver.ex_reinstall_server_image("srv_1", "img_id", "password") |
| self.assertNotEqual(node, None) |
| |
| def test_ex_list_server_ips(self): |
| ips = self.driver.ex_list_server_ips("srv_1") |
| self.assertEqual(len(ips), 1) |
| |
| def test_ex_get_server_ip(self): |
| ip = self.driver.ex_get_server_ip("srv_1", "ip_id") |
| self.assertNotEqual(ip, None) |
| |
| def test_ex_assign_server(self): |
| node = self.driver.ex_assign_server_ip("srv_1", "IPV$") |
| self.assertNotEqual(node, None) |
| |
| def test_ex_remove_server_ip(self): |
| node = self.driver.ex_remove_server_ip("srv_1", "ip_id", keep_ip=True) |
| self.assertNotEqual(node, None) |
| |
| def test_ex_create_firewall_policy(self): |
| rules = [ |
| {"protocol": "TCP", "port_from": 80, "port_to": 80, "source": "0.0.0.0"}, |
| {"protocol": "TCP", "port_from": 443, "port_to": 443, "source": "0.0.0.0"}, |
| ] |
| firewall = self.driver.ex_create_firewall_policy("name", rules, description="desc") |
| |
| self.assertNotEqual(firewall, None) |
| |
| def test_ex_list_firewall_policies(self): |
| firewall = self.driver.ex_list_firewall_policies() |
| self.assertNotEqual(firewall, None) |
| self.assertEqual(len(firewall), 2) |
| |
| def test_ex_get_firewall_policy(self): |
| firewall = self.driver.ex_get_firewall_policy("fw_id") |
| self.assertNotEqual(firewall, None) |
| |
| def test_ex_delete_firewall_policy(self): |
| firewall = self.driver.ex_delete_firewall_policy("fw_id") |
| self.assertNotEqual(firewall, None) |
| |
| def test_ex_get_server_firewall_policies(self): |
| firewall = self.driver.ex_get_server_firewall_policies("srv_id", "ip_id") |
| self.assertNotEqual(firewall, None) |
| |
| def test_ex_add_server_firewall_policy(self): |
| node = self.driver.ex_add_server_firewall_policy("srv_id", "ip_id", "fw_id") |
| self.assertNotEqual(node, None) |
| |
| def test_ex_list_shared_storages(self): |
| storages = self.driver.ex_list_shared_storages() |
| self.assertEqual(len(storages), 3) |
| |
| def test_ex_get_shared_storage(self): |
| storage = self.driver.ex_get_shared_storage("storage_1") |
| |
| self.assertNotEqual(storage, None) |
| self.assertEqual(storage["id"], "6AD2F180B7B666539EF75A02FE227084") |
| self.assertEqual(storage["size"], 200) |
| self.assertEqual(storage["state"], "ACTIVE") |
| self.assertEqual(storage["description"], "My shared storage test description") |
| self.assertEqual(storage["datacenter"]["id"], "D0F6D8C8ED29D3036F94C27BBB7BAD36") |
| self.assertEqual(storage["datacenter"]["location"], "USA") |
| self.assertEqual(storage["datacenter"]["country_code"], "US") |
| self.assertEqual(storage["cloudpanel_id"], "vid35780") |
| self.assertEqual(storage["size_used"], "0.00") |
| self.assertEqual(storage["cifs_path"], "vid50995.nas1.lanvid50995") |
| self.assertEqual(storage["nfs_path"], "vid50995.nas1.lan/:vid50995") |
| self.assertEqual(storage["name"], "My shared storage test") |
| self.assertEqual(storage["creation_date"], "2015-05-06T08:33:25+00:00") |
| self.assertEqual(storage["servers"][0]["id"], "638ED28205B1AFD7ADEF569C725DD85F") |
| self.assertEqual(storage["servers"][0]["name"], "My server 1") |
| self.assertEqual(storage["servers"][0]["rights"], "RW") |
| |
| def test_ex_create_shared_storage(self): |
| storage = self.driver.ex_create_shared_storage(name="TEST", size=2, datacenter_id="dc_id") |
| self.assertNotEqual(storage, None) |
| |
| def test_ex_delete_shared_storage(self): |
| storage = self.driver.ex_delete_shared_storage("storage_1") |
| self.assertNotEqual(storage, None) |
| |
| def test_ex_attach_server_to_shared_storage(self): |
| storage = self.driver.ex_attach_server_to_shared_storage("storage_1", "srv_1", "RW") |
| self.assertNotEqual(storage, None) |
| |
| def test_ex_get_shared_storage_server(self): |
| storage = self.driver.ex_get_shared_storage_server("storage_1", "srv_1") |
| self.assertNotEqual(storage, None) |
| |
| def test_ex_detach_server_from_shared_storage(self): |
| storage = self.driver.ex_detach_server_from_shared_storage("storage_1", "srv_1") |
| self.assertNotEqual(storage, None) |
| |
| def test_ex_create_load_balancers(self): |
| rules = [ |
| { |
| "protocol": "TCP", |
| "port_balancer": 80, |
| "port_server": 80, |
| "source": "0.0.0.0", |
| }, |
| { |
| "protocol": "TCP", |
| "port_balancer": 9999, |
| "port_server": 8888, |
| "source": "0.0.0.0", |
| }, |
| ] |
| load_balancer = self.driver.ex_create_load_balancer( |
| name="name", |
| method="ROUNDROBIN", |
| rules=rules, |
| persistence=True, |
| persistence_time=1, |
| ) |
| |
| self.assertNotEqual(load_balancer, None) |
| |
| def test_ex_list_load_balancers(self): |
| load_balancers = self.driver.ex_list_load_balancers() |
| self.assertEqual(len(load_balancers), 2) |
| |
| def test_update_load_balancer(self): |
| load_balancer = self.driver.ex_update_load_balancer("lb_1", name="new name") |
| self.assertNotEqual(load_balancer, None) |
| |
| def test_ex_add_servers_to_load_balancer(self): |
| load_balancer = self.driver.ex_add_servers_to_load_balancer("lb_1", server_ips=["1.1.1.1"]) |
| self.assertNotEqual(load_balancer, None) |
| |
| def test_ex_remove_server_from_load_balancer(self): |
| load_balancer = self.driver.ex_remove_server_from_load_balancer("lb_1", server_ip="srv_1") |
| self.assertNotEqual(load_balancer, None) |
| |
| def test_ex_add_load_balancer_rule(self): |
| load_balancer = self.driver.ex_add_load_balancer_rule( |
| "lb_1", protocol="TCP", port_balancer=82, port_server=81, source="0.0.0.0" |
| ) |
| self.assertNotEqual(load_balancer, None) |
| |
| def test_ex_remove_load_balancer_rule(self): |
| load_balancer = self.driver.ex_remove_load_balancer_rule("lb_1", "rule_1") |
| self.assertNotEqual(load_balancer, None) |
| |
| def test_ex_get_load_balancer(self): |
| load_balancer = self.driver.ex_get_load_balancer("lb_1") |
| self.assertNotEqual(load_balancer, None) |
| |
| def test_ex_get_load_balancer_server_ip(self): |
| server_ip = self.driver.ex_get_load_balancer_server_ip("lb_1", "srv_1") |
| self.assertNotEqual(server_ip, None) |
| |
| def test_ex_list_load_balancer_rules(self): |
| rules = self.driver.ex_list_load_balancer_rules("lb_1") |
| self.assertNotEqual(rules, None) |
| self.assertEqual(len(rules), 2) |
| |
| def test_ex_get_load_balancer_rule(self): |
| rule = self.driver.ex_get_load_balancer_rule("lb_1", "rule_1") |
| self.assertNotEqual(rule, None) |
| |
| def test_ex_delete_load_balancer(self): |
| load_balancer = self.driver.ex_delete_load_balancer("lb_1") |
| self.assertNotEqual(load_balancer, None) |
| |
| def test_ex_list_public_ips(self): |
| ips = self.driver.ex_list_public_ips() |
| self.assertNotEqual(ips, None) |
| self.assertEqual(len(ips), 3) |
| |
| def test_ex_create_public_ip(self): |
| ip = self.driver.ex_create_public_ip("IPv4") |
| self.assertNotEqual(ip, None) |
| |
| def test_ex_get_public_ip(self): |
| ip = self.driver.ex_get_public_ip("ip_1") |
| self.assertNotEqual(ip, None) |
| |
| def test_ex_delete_public_ip(self): |
| ip = self.driver.ex_delete_public_ip("ip_1") |
| self.assertNotEqual(ip, None) |
| |
| def test_ex_update_public_ip(self): |
| ip = self.driver.ex_update_public_ip("ip_1", "reverse.dns") |
| self.assertNotEqual(ip, None) |
| |
| def test_ex_create_monitoring_policy(self): |
| thresholds = { |
| "cpu": { |
| "warning": {"value": 90, "alert": False}, |
| "critical": {"value": 95, "alert": False}, |
| }, |
| "ram": { |
| "warning": {"value": 90, "alert": False}, |
| "critical": {"value": 95, "alert": False}, |
| }, |
| "disk": { |
| "warning": {"value": 80, "alert": False}, |
| "critical": {"value": 90, "alert": False}, |
| }, |
| "transfer": { |
| "warning": {"value": 1000, "alert": False}, |
| "critical": {"value": 2000, "alert": False}, |
| }, |
| "internal_ping": { |
| "warning": {"value": 50, "alert": False}, |
| "critical": {"value": 100, "alert": False}, |
| }, |
| } |
| |
| ports = [ |
| { |
| "protocol": "TCP", |
| "port": "22", |
| "alert_if": "RESPONDING", |
| "email_notification": True, |
| } |
| ] |
| |
| processes = [{"process": "test", "alert_if": "NOT_RUNNING", "email_notification": True}] |
| |
| policy = self.driver.ex_create_monitoring_policy( |
| name="test_name", |
| thresholds=thresholds, |
| ports=ports, |
| processes=processes, |
| description="description", |
| email="test@domain.com", |
| agent=True, |
| ) |
| self.assertNotEqual(policy, None) |
| |
| def test_ex_list_monitoring_policies(self): |
| policies = self.driver.ex_list_monitoring_policies() |
| self.assertNotEqual(policies, None) |
| self.assertEqual(len(policies), 2) |
| |
| def test_ex_get_monitoring_policy(self): |
| policy = self.driver.ex_get_monitoring_policy("pol_1") |
| self.assertNotEqual(policy, None) |
| |
| def test_ex_update_monitoring_policy(self): |
| thresholds = { |
| "cpu": { |
| "warning": {"value": 90, "alert": False}, |
| "critical": {"value": 95, "alert": False}, |
| }, |
| "ram": { |
| "warning": {"value": 90, "alert": False}, |
| "critical": {"value": 95, "alert": False}, |
| }, |
| "disk": { |
| "warning": {"value": 80, "alert": False}, |
| "critical": {"value": 90, "alert": False}, |
| }, |
| "transfer": { |
| "warning": {"value": 1000, "alert": False}, |
| "critical": {"value": 2000, "alert": False}, |
| }, |
| "internal_ping": { |
| "warning": {"value": 50, "alert": False}, |
| "critical": {"value": 100, "alert": False}, |
| }, |
| } |
| |
| policy = self.driver.ex_update_monitoring_policy( |
| "pol_1", |
| email="test@domain.com", |
| thresholds=thresholds, |
| name="new name", |
| description="new description", |
| ) |
| self.assertNotEqual(policy, None) |
| |
| def test_ex_get_monitoring_policy_ports(self): |
| ports = self.driver.ex_get_monitoring_policy_ports("pol_1") |
| self.assertNotEqual(ports, None) |
| self.assertEqual(len(ports), 2) |
| |
| def test_ex_get_monitoring_policy_port(self): |
| port = self.driver.ex_get_monitoring_policy_port("pol_1", "port_1") |
| self.assertNotEqual(port, None) |
| |
| def test_ex_remove_monitoring_policy_port(self): |
| port = self.driver.ex_remove_monitoring_policy_port("pol_1", "port_1") |
| self.assertNotEqual(port, None) |
| |
| def test_ex_add_monitoring_policy_ports(self): |
| new_ports = [ |
| { |
| "protocol": "TCP", |
| "port": "80", |
| "alert_if": "RESPONDING", |
| "email_notification": True, |
| } |
| ] |
| ports = self.driver.ex_add_monitoring_policy_ports("pol_1", new_ports) |
| self.assertNotEqual(ports, None) |
| self.assertEqual(len(ports), 2) |
| |
| def test_ex_get_monitoring_policy_processes(self): |
| processes = self.driver.ex_get_monitoring_policy_processes("pol_1") |
| self.assertNotEqual(processes, None) |
| |
| def test_ex_get_monitoring_policy_process(self): |
| process = self.driver.ex_get_monitoring_policy_process("pol_1", "proc_1") |
| self.assertNotEqual(process, None) |
| |
| def test_ex_remove_monitoring_policy_process(self): |
| policy = self.driver.ex_remove_monitoring_policy_process("pol_1", "proc_1") |
| self.assertNotEqual(policy, None) |
| |
| def test_ex_add_monitoring_policy_processes(self): |
| processes = { |
| "processes": [ |
| { |
| "process": "taskmmgr", |
| "alert_if": "RUNNING", |
| "email_notification": True, |
| } |
| ] |
| } |
| processes = self.driver.ex_add_monitoring_policy_processes( |
| policy_id="pol_1", processes=processes |
| ) |
| self.assertNotEqual(processes, None) |
| self.assertEqual(len(processes), 2) |
| |
| def test_ex_list_monitoring_policy_servers(self): |
| servers = self.driver.ex_list_monitoring_policy_servers("pol_1") |
| self.assertNotEqual(servers, None) |
| self.assertEqual(len(servers), 2) |
| |
| def test_ex_add_servers_to_monitoring_policy(self): |
| servers = self.driver.ex_add_servers_to_monitoring_policy("pol_1", "serv_1") |
| self.assertNotEqual(servers, None) |
| self.assertEqual(len(servers), 2) |
| |
| def test_ex_remove_server_from_monitoring_policy(self): |
| policy = self.driver.ex_remove_server_from_monitoring_policy("pol_1", "serv_1") |
| self.assertNotEqual(policy, None) |
| |
| |
| class OneAndOneMockHttp(MockHttp): |
| fixtures = ComputeFileFixtures("oneandone") |
| |
| """ |
| Operation on Server Appliances |
| |
| GET - Fetches Server Appliances |
| """ |
| |
| def _v1_server_appliances(self, method, url, body, headers): |
| body = self.fixtures.load("list_images.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_server_appliances_img_1(self, method, url, body, headers): |
| body = self.fixtures.load("get_image.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_servers(self, method, url, body, headers): |
| if method == "GET": |
| body = self.fixtures.load("list_servers.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| if method == "POST": |
| body = self.fixtures.load("create_node.json") |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| def _v1_create_node(self, method, url, body_headers): |
| |
| body = self.fixtures.load("list_servers.json") |
| |
| return (httplib.ACCEPTED, {}, body, httplib.responses[httplib.ACCEPTED]) |
| |
| def _v1_datacenters(self, method, url, body, headers): |
| if method == "GET": |
| body = self.fixtures.load("ex_list_datacenters.json") |
| return ( |
| httplib.OK, |
| body, |
| {"content-type": "application/json"}, |
| httplib.responses[httplib.OK], |
| ) |
| |
| def _v1_servers_srv_1(self, method, url, body, headers): |
| pass |
| |
| if method == "PUT": |
| body = self.fixtures.load("describe_server.json") |
| return ( |
| httplib.OK, |
| body, |
| {"content-type": "application/json"}, |
| httplib.responses[httplib.OK], |
| ) |
| if method == "GET": |
| body = self.fixtures.load("describe_server.json") |
| return ( |
| httplib.OK, |
| body, |
| {"content-type": "application/json"}, |
| httplib.responses[httplib.OK], |
| ) |
| if method == "DELETE": |
| body = self.fixtures.load("describe_server.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_servers_srv_1_status_action(self, method, url, body_headers, id): |
| body = self.fixtures.load("describe_server.json") |
| |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| def _v1_servers_srv_1_hardware(self, method, url, body, headers): |
| if method == "GET": |
| body = self.fixtures.load("server_hardware.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| if method == "PUT": |
| body = self.fixtures.load("describe_server.json") |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| def _v1_servers_srv_1_hardware_hdds(self, method, url, body, headers): |
| if method == "POST": |
| body = self.fixtures.load("describe_server.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| if method == "PUT": |
| body = self.fixtures.load("describe_server.json") |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| def _v1_servers_srv_1_hardware_hdds_hdd_id(self, method, url, body, headers): |
| if method == "DELETE": |
| body = self.fixtures.load("describe_server.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| if method == "PUT": |
| body = self.fixtures.load("describe_server.json") |
| return (httplib.ACCEPTED, body, {}, httplib.responses[httplib.ACCEPTED]) |
| |
| def _v1_servers_srv_1_image(self, method, url, body, headers): |
| if method == "GET": |
| body = self.fixtures.load("get_server_image.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| if method == "PUT": |
| body = self.fixtures.load("describe_server.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_servers_srv_1_ips(self, method, url, body, headers): |
| |
| if method == "GET": |
| body = self.fixtures.load("server_ips.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| if method == "POST": |
| body = self.fixtures.load("describe_server.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_servers_srv_1_ips_ip_id(self, method, url, body, headers): |
| if method == "GET": |
| body = self.fixtures.load("server_ip.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| if method == "DELETE": |
| body = self.fixtures.load("describe_server.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_firewall_policies(self, method, url, body, headers): |
| |
| if method == "POST": |
| body = self.fixtures.load("describe_firewall_policy.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| if method == "GET": |
| body = self.fixtures.load("list_firewall_policies.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_firewall_policy_fw_id(self, method, url, body, headers): |
| if method == "GET": |
| body = self.fixtures.load("describe_firewall_policy.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| if method == "DELETE": |
| body = self.fixtures.load("describe_firewall_policy.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_servers_srv_id_ips_ip_id_firewall_policy(self, method, url, body, header): |
| if method == "GET": |
| body = self.fixtures.load("describe_id_firewall_policy.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| if method == "DELETE": |
| body = self.fixtures.load("describe_server.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| if method == "POST": |
| body = self.fixtures.load("describe_server.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_shared_storages(self, method, url, body, header): |
| if method == "GET" or method == "POST": |
| body = self.fixtures.load("list_shared_storages.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_shared_storages_storage_1(self, method, url, body, header): |
| if method == "GET" or method == "DELETE": |
| body = self.fixtures.load("shared_storage.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_shared_storages_storage_1_servers(self, method, url, body, header): |
| if method == "POST" or method == "DELETE": |
| body = self.fixtures.load("shared_storage.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_shared_storages_storage_1_servers_srv_1(self, method, url, body, header): |
| if method == "GET" or method == "DELETE": |
| body = self.fixtures.load("shared_storage.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_load_balancers(self, method, url, body, header): |
| if method == "POST": |
| body = self.fixtures.load("load_balancer.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| if method == "GET": |
| body = self.fixtures.load("list_load_balancer.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_load_balancers_lb_1(self, method, url, body, header): |
| |
| body = self.fixtures.load("load_balancer.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_load_balancers_lb_1_server_ips(self, method, url, body, header): |
| if method == "GET": |
| body = self.fixtures.load("load_balancer_server_ips.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| if method == "POST": |
| body = self.fixtures.load("load_balancer.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_load_balancers_lb_1_rules(self, method, url, body, header): |
| if method == "POST": |
| body = self.fixtures.load("load_balancer.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| if method == "GET": |
| body = self.fixtures.load("load_balancer_rules.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_load_balancers_lb_1_server_ips_srv_1(self, method, url, body, header): |
| if method == "DELETE": |
| body = self.fixtures.load("load_balancer.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| if method == "GET": |
| body = self.fixtures.load("load_balancer_server_ip.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_load_balancers_lb_1_rules_rule_1(self, method, url, body, header): |
| if method == "DELETE": |
| body = self.fixtures.load("load_balancer.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| if method == "GET": |
| body = self.fixtures.load("load_balancer_rule.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_public_ips(self, method, url, body, header): |
| if method == "GET": |
| body = self.fixtures.load("list_public_ips.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| if method == "POST": |
| body = self.fixtures.load("public_ip.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_public_ips_ip_1(self, method, url, body, header): |
| body = self.fixtures.load("public_ip.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_monitoring_policies(self, method, url, body, header): |
| if method == "POST": |
| body = self.fixtures.load("monitoring_policy.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| if method == "GET": |
| body = self.fixtures.load("list_monitoring_policies.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_monitoring_policies_pol_1(self, method, url, body, header): |
| body = self.fixtures.load("monitoring_policy.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_monitoring_policies_pol_1_ports(self, method, url, body, header): |
| body = self.fixtures.load("monitoring_policy_ports.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_monitoring_policies_pol_1_ports_port_1(self, method, url, body, header): |
| if method == "GET": |
| body = self.fixtures.load("monitoring_policy_port.json") |
| elif method == "POST": |
| body = self.fixtures.load("monitoring_policy.json") |
| |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_monitoring_policies_pol_1_processes(self, method, url, body, header): |
| body = self.fixtures.load("monitoring_policy_processes.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_monitoring_policies_pol_1_processes_proc_1(self, method, url, body, header): |
| if method == "GET": |
| body = self.fixtures.load("monitoring_policy_process.json") |
| elif method == "POST": |
| body = self.fixtures.load("monitoring_policy.json") |
| |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_monitoring_policies_pol_1_servers(self, method, url, body, header): |
| body = self.fixtures.load("monitoring_policy_servers.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_monitoring_policies_pol_1_servers_serv_1(self, method, url, body, header): |
| if method == "GET": |
| body = self.fixtures.load("monitoring_policy_servers.json") |
| elif method == "POST": |
| body = self.fixtures.load("monitoring_policy.json") |
| |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| def _v1_servers_fixed_instance_sizes(self, method, url, body, header): |
| body = self.fixtures.load("fixed_instance_sizes.json") |
| return (httplib.OK, body, {}, httplib.responses[httplib.OK]) |
| |
| |
| if __name__ == "__main__": |
| sys.exit(unittest.main()) |