| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """ |
| NTT CIS Driver |
| """ |
| import re |
| import sys |
| |
| from libcloud.utils.py3 import ET, urlencode, basestring, ensure_string |
| from libcloud.utils.xml import findall, findtext, fixxpath |
| from libcloud.compute.base import ( |
| Node, |
| NodeSize, |
| NodeImage, |
| NodeDriver, |
| NodeLocation, |
| NodeAuthPassword, |
| ) |
| from libcloud.common.nttcis import ( |
| TYPES_URN, |
| GENERAL_NS, |
| NETWORK_NS, |
| API_ENDPOINTS, |
| DEFAULT_REGION, |
| NttCisNic, |
| NttCisTag, |
| NttCisPort, |
| NttCisVlan, |
| LooseVersion, |
| NttCisStatus, |
| NttCisTagKey, |
| NttCisNatRule, |
| NttCisNetwork, |
| NttCisPortList, |
| NttCisIpAddress, |
| NttCisConnection, |
| NttCisServerDisk, |
| NttCisAPIException, |
| NttCisFirewallRule, |
| NttCisChildPortList, |
| NttCisIpAddressList, |
| NttCisNetworkDomain, |
| NttCisPublicIpBlock, |
| NttCisScsiController, |
| NttCisFirewallAddress, |
| NttCisAntiAffinityRule, |
| NttCisReservedIpAddress, |
| NttCisServerVMWareTools, |
| NetworkDomainServicePlan, |
| NttCisChildIpAddressList, |
| NttCisServerCpuSpecification, |
| get_params, |
| process_xml, |
| dd_object_to_id, |
| ) |
| from libcloud.compute.types import Provider, NodeState |
| from libcloud.common.exceptions import BaseHTTPError |
| |
| # Node state map is a dictionary with the keys as tuples |
| # These tuples represent: |
| # (<state_of_node_from_didata>, <is node started?>, <action happening>) |
| NODE_STATE_MAP = { |
| ("NORMAL", "false", None): NodeState.STOPPED, |
| ("PENDING_CHANGE", "false", None): NodeState.PENDING, |
| ("PENDING_CHANGE", "false", "CHANGE_NETWORK_ADAPTER"): NodeState.PENDING, |
| ("PENDING_CHANGE", "true", "CHANGE_NETWORK_ADAPTER"): NodeState.PENDING, |
| ("PENDING_CHANGE", "false", "EXCHANGE_NIC_VLANS"): NodeState.PENDING, |
| ("PENDING_CHANGE", "true", "EXCHANGE_NIC_VLANS"): NodeState.PENDING, |
| ("NORMAL", "true", None): NodeState.RUNNING, |
| ("PENDING_CHANGE", "true", "START_SERVER"): NodeState.STARTING, |
| ("PENDING_ADD", "true", "DEPLOY_SERVER"): NodeState.STARTING, |
| ("PENDING_ADD", "true", "DEPLOY_SERVER_WITH_DISK_SPEED"): NodeState.STARTING, |
| ("PENDING_CHANGE", "true", "SHUTDOWN_SERVER"): NodeState.STOPPING, |
| ("PENDING_CHANGE", "true", "POWER_OFF_SERVER"): NodeState.STOPPING, |
| ("PENDING_CHANGE", "true", "REBOOT_SERVER"): NodeState.REBOOTING, |
| ("PENDING_CHANGE", "true", "RESET_SERVER"): NodeState.REBOOTING, |
| ("PENDING_CHANGE", "true", "RECONFIGURE_SERVER"): NodeState.RECONFIGURING, |
| } |
| |
| OBJECT_TO_TAGGING_ASSET_TYPE_MAP = { |
| "Node": "SERVER", |
| "NodeImage": "CUSTOMER_IMAGE", |
| "NttCisNetworkDomain": "NETWORK_DOMAIN", |
| "NttCisVlan": "VLAN", |
| "NttCisPublicIpBlock": "PUBLIC_IP_BLOCK", |
| } |
| |
| |
| class NttCisNodeDriver(NodeDriver): |
| """ |
| NttCis node driver. |
| Default api_version is used unless specified. |
| """ |
| |
| selected_region = None |
| connectionCls = NttCisConnection |
| name = "NTTC-CIS" |
| website = "https://www.us.ntt.com/en/services/cloud/enterprise-cloud.html" |
| type = Provider.NTTCIS |
| features = {"create_node": ["password"]} |
| api_version = 1.0 |
| |
| def __init__( |
| self, |
| key, |
| secret=None, |
| secure=True, |
| host=None, |
| port=None, |
| api_version=None, |
| region=DEFAULT_REGION, |
| **kwargs, |
| ): |
| |
| if region not in API_ENDPOINTS and host is None: |
| raise ValueError("Invalid region: %s, no host specified" % (region)) |
| if region is not None: |
| self.selected_region = API_ENDPOINTS[region] |
| |
| if api_version is not None: |
| self.api_version = api_version |
| |
| super().__init__( |
| key=key, |
| secret=secret, |
| secure=secure, |
| host=host, |
| port=port, |
| api_version=api_version, |
| region=region, |
| **kwargs, |
| ) |
| |
| def _ex_connection_class_kwargs(self): |
| """ |
| Add the region to the kwargs before the connection is instantiated |
| """ |
| |
| kwargs = super()._ex_connection_class_kwargs() |
| kwargs["region"] = self.selected_region |
| kwargs["api_version"] = self.api_version |
| return kwargs |
| |
| def _create_node_mcp1( |
| self, |
| name, |
| image, |
| auth, |
| ex_description, |
| ex_network=None, |
| ex_memory_gb=None, |
| ex_cpu_specification=None, |
| ex_is_started=True, |
| ex_primary_dns=None, |
| ex_secondary_dns=None, |
| **kwargs, |
| ): |
| """ |
| Create a new NTTCIS node |
| |
| :keyword name: String with a name for this new node (required) |
| :type name: ``str`` |
| |
| :keyword image: OS Image to boot on node. (required) |
| :type image: :class:`NodeImage` or ``str`` |
| |
| :keyword auth: Initial authentication information for the |
| node. (If this is a customer LINUX |
| image auth will be ignored) |
| :type auth: :class:`NodeAuthPassword` or ``str`` or |
| ``None`` |
| |
| :keyword ex_description: description for this node (required) |
| :type ex_description: ``str`` |
| |
| :keyword ex_network: Network to create the node within |
| (required unless using ex_network_domain |
| or ex_primary_ipv4) |
| |
| :type ex_network: :class:`NttCisNetwork` or ``str`` |
| |
| :keyword ex_memory_gb: The amount of memory in GB for the |
| server |
| :type ex_memory_gb: ``int`` |
| |
| :keyword ex_cpu_specification: The spec of CPU to deploy ( |
| optional) |
| :type ex_cpu_specification: |
| :class:`DimensionDataServerCpuSpecification` |
| |
| :keyword ex_is_started: Start server after creation? default |
| true (required) |
| :type ex_is_started: ``bool`` |
| |
| :keyword ex_primary_dns: The node's primary DNS |
| |
| :type ex_primary_dns: ``str`` |
| |
| :keyword ex_secondary_dns: The node's secondary DNS |
| |
| :type ex_secondary_dns: ``str`` |
| |
| :return: The newly created :class:`Node`. |
| :rtype: :class:`Node` |
| """ |
| |
| password = None |
| image_needs_auth = self._image_needs_auth(image) |
| if image_needs_auth: |
| if isinstance(auth, basestring): |
| auth_obj = NodeAuthPassword(password=auth) |
| password = auth |
| else: |
| auth_obj = self._get_and_check_auth(auth) |
| password = auth_obj.password |
| |
| server_elm = ET.Element("deployServer", {"xmlns": TYPES_URN}) |
| ET.SubElement(server_elm, "name").text = name |
| ET.SubElement(server_elm, "description").text = ex_description |
| image_id = self._image_to_image_id(image) |
| ET.SubElement(server_elm, "imageId").text = image_id |
| ET.SubElement(server_elm, "start").text = str(ex_is_started).lower() |
| if password is not None: |
| ET.SubElement(server_elm, "administratorPassword").text = password |
| |
| if ex_cpu_specification is not None: |
| cpu = ET.SubElement(server_elm, "cpu") |
| cpu.set("speed", ex_cpu_specification.performance) |
| cpu.set("count", str(ex_cpu_specification.cpu_count)) |
| cpu.set("coresPerSocket", str(ex_cpu_specification.cores_per_socket)) |
| |
| if ex_memory_gb is not None: |
| ET.SubElement(server_elm, "memoryGb").text = str(ex_memory_gb) |
| |
| if ex_network is not None: |
| network_elm = ET.SubElement(server_elm, "network") |
| network_id = self._network_to_network_id(ex_network) |
| ET.SubElement(network_elm, "networkId").text = network_id |
| |
| if ex_primary_dns: |
| dns_elm = ET.SubElement(server_elm, "primaryDns") |
| dns_elm.text = ex_primary_dns |
| |
| if ex_secondary_dns: |
| dns_elm = ET.SubElement(server_elm, "secondaryDns") |
| dns_elm.text = ex_secondary_dns |
| |
| response = self.connection.request_with_orgId_api_2( |
| "server/deployServer", method="POST", data=ET.tostring(server_elm) |
| ).object |
| |
| node_id = None |
| for info in findall(response, "info", TYPES_URN): |
| if info.get("name") == "serverId": |
| node_id = info.get("value") |
| |
| node = self.ex_get_node_by_id(node_id) |
| |
| if image_needs_auth: |
| if getattr(auth_obj, "generated", False): |
| node.extra["password"] = auth_obj.password |
| |
| return node |
| |
| def create_node( |
| self, |
| name, |
| image, |
| auth, |
| ex_network_domain=None, |
| ex_primary_nic_private_ipv4=None, |
| ex_primary_nic_vlan=None, |
| ex_primary_nic_network_adapter=None, |
| ex_additional_nics=None, |
| ex_description=None, |
| ex_disks=None, |
| ex_cpu_specification=None, |
| ex_memory_gb=None, |
| ex_is_started=True, |
| ex_primary_dns=None, |
| ex_secondary_dns=None, |
| ex_ipv4_gateway=None, |
| ex_microsoft_time_zone=None, |
| **kwargs, |
| ): |
| """ |
| Create a new NTTCIS node in MCP2. However, it is still |
| backward compatible for MCP1 for a limited time. Please consider |
| using MCP2 datacenter as MCP1 will phase out soon. |
| |
| Legacy Create Node for MCP1 datacenter |
| |
| >>> from pprint import pprint |
| >>> from libcloud.compute.types import Provider |
| >>> from libcloud.compute.base import NodeAuthPassword |
| >>> from libcloud.compute.providers import get_driver |
| >>> import libcloud.security |
| >>> |
| >>> # Get NTTC-CIS driver |
| >>> libcloud.security.VERIFY_SSL_CERT = False |
| >>> NTTCIS = get_driver(Provider.NTTCIS) |
| >>> driver = cls('myusername','mypassword', region='dd-au') |
| >>> |
| >>> # Password |
| >>> root_pw = NodeAuthPassword('password123') |
| >>> |
| >>> # Get location |
| >>> location = driver.ex_get_location_by_id(id='AU1') |
| >>> |
| >>> # Get network by location |
| >>> my_network = driver.list_networks(location=location)[0] |
| >>> pprint(my_network) |
| >>> |
| >>> # Get Image |
| >>> images = driver.list_images(location=location) |
| >>> image = images[0] |
| >>> |
| >>> node = driver.create_node(name='test_blah_2', image=image, |
| >>> auth=root_pw, |
| >>> ex_description='test3 node', |
| >>> ex_network=my_network, |
| >>> ex_is_started=False) |
| >>> pprint(node) |
| |
| |
| Create Node in MCP2 Data CenterF |
| |
| >>> from pprint import pprint |
| >>> from libcloud.compute.types import Provider |
| >>> from libcloud.compute.base import NodeAuthPassword |
| >>> from libcloud.compute.providers import get_driver |
| >>> import libcloud.security |
| >>> |
| >>> # Get NTTC-CIS driver |
| >>> libcloud.security.VERIFY_SSL_CERT = True |
| >>> cls = get_driver(Provider.NTTCIS) |
| >>> driver = cls('myusername','mypassword', region='dd-au') |
| >>> |
| >>> # Password |
| >>> root_pw = NodeAuthPassword('password123') |
| >>> |
| >>> # Get location |
| >>> location = driver.ex_get_location_by_id(id='AU9') |
| >>> |
| >>> # Get network domain by location |
| >>> networkDomainName = "Baas QA" |
| >>> network_domains = driver.ex_list_network_domains(location=location) |
| >>> my_network_domain = [d for d in network_domains if d.name == |
| networkDomainName][0] |
| >>> |
| >>> vlan = driver.ex_list_vlans(location=location, |
| >>> network_domain=my_network_domain)[0] |
| >>> pprint(vlan) |
| >>> |
| >>> # Get Image |
| >>> images = driver.list_images(location=location) |
| >>> image = images[0] |
| >>> |
| >>> # Create node using vlan instead of private IPv4 |
| >>> node = driver.create_node(name='test_server_01', image=image, |
| >>> auth=root_pw, |
| >>> ex_description='test2 node', |
| >>> ex_network_domain=my_network_domain, |
| >>> ex_primary_nic_vlan=vlan, |
| >>> ex_is_started=False) |
| >>> |
| >>> # Option: Create node using private IPv4 instead of vlan |
| >>> # node = driver.create_node(name='test_server_02', image=image, |
| >>> # auth=root_pw, |
| >>> # ex_description='test2 node', |
| >>> # ex_network_domain=my_network_domain, |
| >>> # ex_primary_nic_private_ipv4='10.1.1.7', |
| >>> # ex_is_started=False) |
| >>> |
| >>> # Option: Create node using by specifying Network Adapter |
| >>> # node = driver.create_node(name='test_server_03', image=image, |
| >>> # auth=root_pw, |
| >>> # ex_description='test2 node', |
| >>> # ex_network_domain=my_network_domain, |
| >>> # ex_primary_nic_vlan=vlan, |
| >>> # ex_primary_nic_network_adapter='E1000', |
| >>> # ex_is_started=False) |
| >>> |
| |
| :keyword name: (required) String with a name for this new node |
| :type name: ``str`` |
| |
| :keyword image: (required) OS Image to boot on node. |
| :type image: :class:`NodeImage` or ``str`` |
| |
| :keyword auth: Initial authentication information for the |
| node. (If this is a customer LINUX |
| image auth will be ignored) |
| :type auth: :class:`NodeAuthPassword` or ``str`` or ``None`` |
| |
| :keyword ex_description: (optional) description for this node |
| :type ex_description: ``str`` |
| |
| |
| :keyword ex_network_domain: (required) Network Domain or Network |
| Domain ID to create the node |
| :type ex_network_domain: :class:`DimensionDataNetworkDomain` |
| or ``str`` |
| |
| :keyword ex_primary_nic_private_ipv4: Provide private IPv4. Ignore |
| if ex_primary_nic_vlan is |
| provided. Use one or the |
| other. Not both. |
| :type ex_primary_nic_private_ipv4: :``str`` |
| |
| :keyword ex_primary_nic_vlan: Provide VLAN for the node if |
| ex_primary_nic_private_ipv4 NOT |
| provided. One or the other. Not both. |
| :type ex_primary_nic_vlan: :class: DimensionDataVlan or ``str`` |
| |
| :keyword ex_primary_nic_network_adapter: (Optional) Default value |
| for the Operating System |
| will be used if leave |
| empty. Example: "E1000". |
| :type ex_primary_nic_network_adapter: :``str`` |
| |
| :keyword ex_additional_nics: (optional) List |
| :class:'NttCisNic' or None |
| :type ex_additional_nics: ``list`` of :class:'NttCisNic' |
| or ``str`` |
| |
| :keyword ex_memory_gb: (optional) The amount of memory in GB for |
| the server Can be used to override the |
| memory value inherited from the source |
| Server Image. |
| :type ex_memory_gb: ``int`` |
| |
| :keyword ex_cpu_specification: (optional) The spec of CPU to deploy |
| :type ex_cpu_specification: |
| :class:`DimensionDataServerCpuSpecification` |
| |
| :keyword ex_is_started: (required) Start server after creation. |
| Default is set to true. |
| :type ex_is_started: ``bool`` |
| |
| :keyword ex_primary_dns: (Optional) The node's primary DNS |
| :type ex_primary_dns: ``str`` |
| |
| :keyword ex_secondary_dns: (Optional) The node's secondary DNS |
| :type ex_secondary_dns: ``str`` |
| |
| :keyword ex_ipv4_gateway: (Optional) IPv4 address in dot-decimal |
| notation, which will be used as the |
| Primary NIC gateway instead of the default |
| gateway assigned by the system. If |
| ipv4Gateway is provided it does not have |
| to be on the VLAN of the Primary NIC |
| but MUST be reachable or the Guest OS |
| will not be configured correctly. |
| :type ex_ipv4_gateway: ``str`` |
| |
| :keyword ex_disks: (optional) NTTCIS disks. Optional disk |
| elements can be used to define the disk speed |
| that each disk on the Server; inherited from the |
| source Server Image will be deployed to. It is |
| not necessary to include a diskelement for every |
| disk; only those that you wish to set a disk |
| speed value for. Note that scsiId 7 cannot be |
| used.Up to 13 disks can be present in addition to |
| the required OS disk on SCSI ID 0. Refer to |
| https://docs.mcp-services.net/x/UwIu for disk |
| |
| :type ex_disks: List or tuple of :class:'DimensionDataServerDisk` |
| |
| :keyword ex_microsoft_time_zone: (optional) For use with |
| Microsoft Windows source Server Images only. For the exact |
| value to use please refer to the table of time zone |
| indexes in the following Microsoft Technet |
| documentation. If none is supplied, the default time |
| zone for the data center geographic region will be used. |
| :type ex_microsoft_time_zone: `str`` |
| |
| |
| :return: The newly created :class:`Node`. |
| :rtype: :class:`Node` |
| """ |
| |
| # Neither legacy MCP1 network nor MCP2 network domain provided |
| if ex_network_domain is None and "ex_network" not in kwargs: |
| raise ValueError( |
| "You must provide either ex_network_domain " |
| "for MCP2 or ex_network for legacy MCP1" |
| ) |
| |
| # Ambiguous parameter provided. Can't determine if it is MCP 1 or 2. |
| if ex_network_domain is not None and "ex_network" in kwargs: |
| raise ValueError( |
| "You can only supply either " |
| "ex_network_domain " |
| "for MCP2 or ex_network for legacy MCP1" |
| ) |
| |
| # Set ex_is_started to False by default if none bool data type provided |
| if not isinstance(ex_is_started, bool): |
| ex_is_started = True |
| |
| # Handle MCP1 legacy |
| if "ex_network" in kwargs: |
| new_node = self._create_node_mcp1( |
| name=name, |
| image=image, |
| auth=auth, |
| ex_network=kwargs.get("ex_network"), |
| ex_description=ex_description, |
| ex_memory_gb=ex_memory_gb, |
| ex_cpu_specification=ex_cpu_specification, |
| ex_is_started=ex_is_started, |
| ex_primary_ipv4=ex_primary_nic_private_ipv4, |
| ex_disks=ex_disks, |
| ex_additional_nics_vlan=kwargs.get("ex_additional_nics_vlan"), |
| ex_additional_nics_ipv4=kwargs.get("ex_additional_nics_ipv4"), |
| ex_primary_dns=ex_primary_dns, |
| ex_secondary_dns=ex_secondary_dns, |
| ) |
| else: |
| # Handle MCP2 legacy. CaaS api 2.2 or earlier |
| if "ex_vlan" in kwargs: |
| ex_primary_nic_vlan = kwargs.get("ex_vlan") |
| |
| if "ex_primary_ipv4" in kwargs: |
| ex_primary_nic_private_ipv4 = kwargs.get("ex_primary_ipv4") |
| |
| additional_nics = [] |
| |
| if "ex_additional_nics_vlan" in kwargs: |
| vlans = kwargs.get("ex_additional_nics_vlan") |
| if isinstance(vlans, (list, tuple)): |
| for v in vlans: |
| add_nic = NttCisNic(vlan=v) |
| additional_nics.append(add_nic) |
| else: |
| raise TypeError("ex_additional_nics_vlan must " "be None or a tuple/list") |
| |
| if "ex_additional_nics_ipv4" in kwargs: |
| ips = kwargs.get("ex_additional_nics_ipv4") |
| |
| if isinstance(ips, (list, tuple)): |
| for ip in ips: |
| add_nic = NttCisNic(private_ip_v4=ip) |
| additional_nics.append(add_nic) |
| else: |
| if ips is not None: |
| raise TypeError("ex_additional_nics_ipv4 must " "be None or a tuple/list") |
| |
| if "ex_additional_nics_vlan" in kwargs or "ex_additional_nics_ipv4" in kwargs: |
| ex_additional_nics = additional_nics |
| |
| # Handle MCP2 latest. CaaS API 2.3 onwards |
| if ex_network_domain is None: |
| raise ValueError("ex_network_domain must be specified") |
| |
| password = None |
| image_needs_auth = self._image_needs_auth(image) |
| if image_needs_auth: |
| if isinstance(auth, basestring): |
| auth_obj = NodeAuthPassword(password=auth) |
| password = auth |
| else: |
| auth_obj = self._get_and_check_auth(auth) |
| password = auth_obj.password |
| |
| server_elm = ET.Element("deployServer", {"xmlns": TYPES_URN}) |
| ET.SubElement(server_elm, "name").text = name |
| ET.SubElement(server_elm, "description").text = ex_description |
| image_id = self._image_to_image_id(image) |
| ET.SubElement(server_elm, "imageId").text = image_id |
| ET.SubElement(server_elm, "start").text = str(ex_is_started).lower() |
| if password is not None: |
| ET.SubElement(server_elm, "administratorPassword").text = password |
| |
| if ex_cpu_specification is not None: |
| cpu = ET.SubElement(server_elm, "cpu") |
| cpu.set("speed", ex_cpu_specification.performance) |
| cpu.set("count", str(ex_cpu_specification.cpu_count)) |
| cpu.set("coresPerSocket", str(ex_cpu_specification.cores_per_socket)) |
| |
| if ex_memory_gb is not None: |
| ET.SubElement(server_elm, "memoryGb").text = str(ex_memory_gb) |
| |
| if ex_primary_nic_private_ipv4 is None and ex_primary_nic_vlan is None: |
| raise ValueError( |
| "Missing argument. Either " |
| "ex_primary_nic_private_ipv4 or " |
| "ex_primary_nic_vlan " |
| "must be specified." |
| ) |
| |
| if ex_primary_nic_private_ipv4 is not None and ex_primary_nic_vlan is not None: |
| raise ValueError( |
| "Either ex_primary_nic_private_ipv4 or " |
| "ex_primary_nic_vlan " |
| "be specified. Not both." |
| ) |
| |
| network_elm = ET.SubElement(server_elm, "networkInfo") |
| |
| net_domain_id = self._network_domain_to_network_domain_id(ex_network_domain) |
| |
| network_elm.set("networkDomainId", net_domain_id) |
| |
| pri_nic = ET.SubElement(network_elm, "primaryNic") |
| |
| if ex_primary_nic_private_ipv4 is not None: |
| ET.SubElement(pri_nic, "privateIpv4").text = ex_primary_nic_private_ipv4 |
| |
| if ex_primary_nic_vlan is not None: |
| vlan_id = self._vlan_to_vlan_id(ex_primary_nic_vlan) |
| ET.SubElement(pri_nic, "vlanId").text = vlan_id |
| |
| if ex_primary_nic_network_adapter is not None: |
| ET.SubElement(pri_nic, "networkAdapter").text = ex_primary_nic_network_adapter |
| |
| if isinstance(ex_additional_nics, (list, tuple)): |
| for nic in ex_additional_nics: |
| additional_nic = ET.SubElement(network_elm, "additionalNic") |
| |
| if nic.private_ip_v4 is None and nic.vlan is None: |
| raise ValueError( |
| "Either a vlan or private_ip_v4 " |
| "must be specified for each " |
| "additional nic." |
| ) |
| |
| if nic.private_ip_v4 is not None and nic.vlan is not None: |
| raise ValueError( |
| "Either a vlan or private_ip_v4 " |
| "must be specified for each " |
| "additional nic. Not both." |
| ) |
| |
| if nic.private_ip_v4 is not None: |
| ET.SubElement(additional_nic, "privateIpv4").text = nic.private_ip_v4 |
| |
| if nic.vlan is not None: |
| vlan_id = self._vlan_to_vlan_id(nic.vlan) |
| ET.SubElement(additional_nic, "vlanId").text = vlan_id |
| |
| if nic.network_adapter_name is not None: |
| ET.SubElement( |
| additional_nic, "networkAdapter" |
| ).text = nic.network_adapter_name |
| elif ex_additional_nics is not None: |
| raise TypeError("ex_additional_NICs must be None or tuple/list") |
| |
| if ex_primary_dns: |
| dns_elm = ET.SubElement(server_elm, "primaryDns") |
| dns_elm.text = ex_primary_dns |
| |
| if ex_secondary_dns: |
| dns_elm = ET.SubElement(server_elm, "secondaryDns") |
| dns_elm.text = ex_secondary_dns |
| |
| if ex_ipv4_gateway: |
| ET.SubElement(server_elm, "ipv4Gateway").text = ex_ipv4_gateway |
| |
| if isinstance(ex_disks, (list, tuple)): |
| for disk in ex_disks: |
| disk_elm = ET.SubElement(server_elm, "disk") |
| disk_elm.set("scsiId", disk.scsi_id) |
| disk_elm.set("speed", disk.speed) |
| elif ex_disks is not None: |
| raise TypeError("ex_disks must be None or tuple/list") |
| |
| if ex_microsoft_time_zone: |
| ET.SubElement(server_elm, "microsoftTimeZone").text = ex_microsoft_time_zone |
| |
| response = self.connection.request_with_orgId_api_2( |
| "server/deployServer", method="POST", data=ET.tostring(server_elm) |
| ).object |
| |
| node_id = None |
| for info in findall(response, "info", TYPES_URN): |
| if info.get("name") == "serverId": |
| node_id = info.get("value") |
| |
| new_node = self.ex_get_node_by_id(node_id) |
| |
| if image_needs_auth: |
| if getattr(auth_obj, "generated", False): |
| new_node.extra["password"] = auth_obj.password |
| |
| return new_node |
| |
| def destroy_node(self, node): |
| """ |
| Deletes a node, node must be stopped before deletion |
| |
| |
| :keyword node: The node to delete |
| :type node: :class:`Node` |
| |
| :rtype: ``bool`` |
| """ |
| |
| request_elm = ET.Element("deleteServer", {"xmlns": TYPES_URN, "id": node.id}) |
| body = self.connection.request_with_orgId_api_2( |
| "server/deleteServer", method="POST", data=ET.tostring(request_elm) |
| ).object |
| response_code = findtext(body, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def reboot_node(self, node): |
| """ |
| Reboots a node by requesting the OS restart via the hypervisor |
| |
| |
| :keyword node: The node to reboot |
| :type node: :class:`Node` |
| |
| :rtype: ``bool`` |
| """ |
| |
| request_elm = ET.Element("rebootServer", {"xmlns": TYPES_URN, "id": node.id}) |
| body = self.connection.request_with_orgId_api_2( |
| "server/rebootServer", method="POST", data=ET.tostring(request_elm) |
| ).object |
| response_code = findtext(body, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def list_nodes( |
| self, |
| ex_location=None, |
| ex_name=None, |
| ex_ipv6=None, |
| ex_ipv4=None, |
| ex_vlan=None, |
| ex_image=None, |
| ex_deployed=None, |
| ex_started=None, |
| ex_state=None, |
| ex_network_domain=None, |
| ex_snaphots=None, |
| ): |
| """ |
| List nodes deployed for your organization. |
| |
| :keyword ex_location: Filters the node list to nodes that are |
| located in this location |
| :type ex_location: :class:`NodeLocation` or ``str`` |
| |
| :keyword ex_name: Filters the node list to nodes that have this name |
| :type ex_name ``str`` |
| |
| :keyword ex_ipv6: Filters the node list to nodes that have this |
| ipv6 address |
| :type ex_ipv6: ``str`` |
| |
| :keyword ex_ipv4: Filters the node list to nodes that have this |
| ipv4 address |
| :type ex_ipv4: ``str`` |
| |
| :keyword ex_vlan: Filters the node list to nodes that are in this VLAN |
| :type ex_vlan: :class:`DimensionDataVlan` or ``str`` |
| |
| :keyword ex_image: Filters the node list to nodes that have this image |
| :type ex_image: :class:`NodeImage` or ``str`` |
| |
| :keyword ex_deployed: Filters the node list to nodes that are |
| deployed or not |
| :type ex_deployed: ``bool`` |
| |
| :keyword ex_started: Filters the node list to nodes that are |
| started or not |
| :type ex_started: ``bool`` |
| |
| :keyword ex_state: Filters the node list by nodes that are in |
| this state |
| :type ex_state: ``str`` |
| |
| :keyword ex_network_domain: Filters the node list to nodes in this |
| network domain |
| :type ex_network_domain: :class:`NttCisNetworkDomain` |
| or ``str`` |
| |
| :return: a list of `Node` objects |
| :rtype: ``list`` of :class:`Node` |
| """ |
| |
| node_list = [] |
| # This is a generator so we changed from the original |
| # and if nodes is not empty, ie, the stop iteration confdition |
| # Then set node_list to nodes and retrn. |
| for nodes in self.ex_list_nodes_paginated( |
| location=ex_location, |
| name=ex_name, |
| ipv6=ex_ipv6, |
| ipv4=ex_ipv4, |
| vlan=ex_vlan, |
| image=ex_image, |
| deployed=ex_deployed, |
| started=ex_started, |
| state=ex_state, |
| network_domain=ex_network_domain, |
| ): |
| if nodes: |
| node_list = nodes |
| return node_list |
| |
| def list_images(self, location=None): |
| """ |
| List images available |
| |
| Note: Currently only returns the default 'base OS images' |
| provided by NTTCIS. Customer images (snapshots) |
| use ex_list_customer_images |
| |
| :keyword ex_location: Filters the node list to nodes that are |
| located in this location |
| :type ex_location: :class:`NodeLocation` or ``str`` |
| |
| :return: List of images available |
| :rtype: ``list`` of :class:`NodeImage` |
| """ |
| |
| params = {} |
| if location is not None: |
| params["datacenterId"] = self._location_to_location_id(location) |
| |
| return self._to_images( |
| self.connection.request_with_orgId_api_2("image/osImage", params=params).object |
| ) |
| |
| def list_sizes(self, location=None): |
| """ |
| return a list of available sizes |
| Currently, the size of the node is dictated by the chosen OS base |
| image, they cannot be set explicitly. |
| |
| @inherits: :class:`NodeDriver.list_sizes` |
| """ |
| |
| return [ |
| NodeSize( |
| id=1, |
| name="default", |
| ram=0, |
| disk=0, |
| bandwidth=0, |
| price=0, |
| driver=self.connection.driver, |
| ), |
| ] |
| |
| def list_datacenter_properties(self, location): |
| """ |
| return a list of available sizes |
| Currently, the size of the node is dictated by the chosen OS base |
| image, they cannot be set explicitly. |
| |
| @inherits: :class:`NodeDriver.list_sizes` |
| """ |
| |
| return [ |
| NodeSize( |
| id=1, |
| name="default", |
| ram=0, |
| disk=0, |
| bandwidth=0, |
| price=0, |
| driver=self.connection.driver, |
| ), |
| ] |
| |
| def list_locations(self, ex_id=None): |
| """ |
| List locations (datacenters) available for instantiating servers and |
| networks. |
| |
| :keyword ex_id: Filters the location list to this id |
| :type ex_id: ``str`` |
| |
| :return: List of locations |
| :rtype: ``list`` of :class:`NttCisDatacenter` |
| """ |
| |
| params = {} |
| if ex_id is not None: |
| params["id"] = ex_id |
| |
| return self._to_locations( |
| self.connection.request_with_orgId_api_2( |
| "infrastructure/datacenter", params=params |
| ).object |
| ) |
| |
| def list_snapshot_windows(self, location, plan): |
| """ |
| List snapshot windows in a given location |
| :param location: a location object or location id such as "NA9" |
| :param plan: 'ESSENTIALS' or 'ADVANCED' |
| :return: dictionary with keys id, day_of_week, start_hour, availability |
| :rtype: dict |
| """ |
| |
| params = {} |
| params["datacenterId"] = self._location_to_location_id(location) |
| params["servicePlan"] = plan |
| return self._to_windows( |
| self.connection.request_with_orgId_api_2( |
| "infrastructure/snapshotWindow", params=params |
| ).object |
| ) |
| |
| def list_networks(self, location=None): |
| """ |
| List networks deployed across all data center locations for your |
| organization. The response includes the location of each network. |
| |
| |
| :keyword location: The location |
| :type location: :class:`NodeLocation` or ``str`` |
| |
| :return: a list of NttCisNetwork objects |
| :rtype: ``list`` of :class:`NttCisNetwork` |
| """ |
| |
| url_ext = "" |
| if location is not None: |
| url_ext = "/" + self._location_to_location_id(location) |
| |
| return self._to_networks( |
| self.connection.request_with_orgId_api_2("networkWithLocation%s" % url_ext).object |
| ) |
| |
| def import_image( |
| self, |
| ovf_package_name, |
| name, |
| cluster_id=None, |
| datacenter_id=None, |
| description=None, |
| is_guest_os_customization=None, |
| tagkey_name_value_dictionaries=None, |
| ): |
| """ |
| Import image |
| |
| :param ovf_package_name: Image OVF package name |
| :type ovf_package_name: ``str`` |
| |
| :param name: Image name |
| :type name: ``str`` |
| |
| :param cluster_id: Provide either cluster_id or datacenter_id |
| :type cluster_id: ``str`` |
| |
| :param datacenter_id: Provide either cluster_id or datacenter_id |
| :type datacenter_id: ``str`` |
| |
| :param description: Optional. Description of image |
| :type description: ``str`` |
| |
| :param is_guest_os_customization: Optional. true for NGOC image |
| :type is_guest_os_customization: ``bool`` |
| |
| :param tagkey_name_value_dictionaries: Optional tagkey name value dict |
| :type tagkey_name_value_dictionaries: dictionaries |
| |
| :return: Return true if successful |
| :rtype: ``bool`` |
| """ |
| |
| # Unsupported for version lower than 2.4 |
| if LooseVersion(self.connection.active_api_version) < LooseVersion("2.4"): |
| raise Exception( |
| "import image is feature is NOT supported in " "api version earlier than 2.4" |
| ) |
| elif cluster_id is None and datacenter_id is None: |
| raise ValueError("Either cluster_id or datacenter_id must be " "provided") |
| elif cluster_id is not None and datacenter_id is not None: |
| raise ValueError( |
| "Cannot accept both cluster_id and " "datacenter_id. Please provide either one" |
| ) |
| else: |
| import_image_elem = ET.Element("urn:importImage", {"xmlns:urn": TYPES_URN}) |
| |
| ET.SubElement(import_image_elem, "urn:ovfPackage").text = ovf_package_name |
| |
| ET.SubElement(import_image_elem, "urn:name").text = name |
| |
| if description is not None: |
| ET.SubElement(import_image_elem, "urn:description").text = description |
| |
| if cluster_id is not None: |
| ET.SubElement(import_image_elem, "urn:clusterId").text = cluster_id |
| else: |
| ET.SubElement(import_image_elem, "urn:datacenterId").text = datacenter_id |
| |
| if is_guest_os_customization is not None: |
| ET.SubElement( |
| import_image_elem, "urn:guestOsCustomization" |
| ).text = is_guest_os_customization |
| |
| if ( |
| tagkey_name_value_dictionaries is not None |
| and len(tagkey_name_value_dictionaries) > 0 |
| ): |
| for k, v in tagkey_name_value_dictionaries.items(): |
| tag_elem = ET.SubElement(import_image_elem, "urn:tag") |
| ET.SubElement(tag_elem, "urn:tagKeyName").text = k |
| if v is not None: |
| ET.SubElement(tag_elem, "urn:value").text = v |
| |
| response = self.connection.request_with_orgId_api_2( |
| "image/importImage", method="POST", data=ET.tostring(import_image_elem) |
| ).object |
| |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def start_node(self, node): |
| """ |
| Powers on an existing deployed server |
| |
| :param node: Node which should be used |
| :type node: :class:`Node` |
| |
| :rtype: ``bool`` |
| """ |
| |
| request_elm = ET.Element("startServer", {"xmlns": TYPES_URN, "id": node.id}) |
| body = self.connection.request_with_orgId_api_2( |
| "server/startServer", method="POST", data=ET.tostring(request_elm) |
| ).object |
| response_code = findtext(body, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def stop_node(self, node): |
| """ |
| This function will attempt to "gracefully" stop a server by |
| initiating a shutdown sequence within the guest operating system. |
| A successful response on this function means the system has |
| successfully passed the request into the operating system. |
| |
| :param node: Node which should be used |
| :type node: :class:`Node` |
| |
| :rtype: ``bool`` |
| """ |
| request_elm = ET.Element("shutdownServer", {"xmlns": TYPES_URN, "id": node.id}) |
| body = self.connection.request_with_orgId_api_2( |
| "server/shutdownServer", method="POST", data=ET.tostring(request_elm) |
| ).object |
| response_code = findtext(body, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_list_nodes_paginated( |
| self, |
| name=None, |
| location=None, |
| ipv6=None, |
| ipv4=None, |
| vlan=None, |
| image=None, |
| deployed=None, |
| started=None, |
| state=None, |
| network=None, |
| network_domain=None, |
| ): |
| """ |
| Return a generator which yields node lists in pages |
| |
| :keyword location: Filters the node list to nodes that are |
| located in this location |
| :type location: :class:`NodeLocation` or ``str`` |
| |
| :keyword name: Filters the node list to nodes that have this name |
| :type name ``str`` |
| |
| :keyword ipv6: Filters the node list to nodes that have this |
| ipv6 address |
| :type ipv6: ``str`` |
| |
| :keyword ipv4: Filters the node list to nodes that have this |
| ipv4 address |
| :type ipv4: ``str`` |
| |
| :keyword vlan: Filters the node list to nodes that are in this VLAN |
| :type vlan: :class:`NttCisVlan` or ``str`` |
| |
| :keyword image: Filters the node list to nodes that have this image |
| :type image: :class:`NodeImage` or ``str`` |
| |
| :keyword deployed: Filters the node list to nodes that are |
| deployed or not |
| :type deployed: ``bool`` |
| |
| :keyword started: Filters the node list to nodes that are |
| started or not |
| :type started: ``bool`` |
| :keyword state: Filters the node list to nodes that are in |
| this state |
| :type state: ``str`` |
| :keyword network: Filters the node list to nodes in this network |
| :type network: :class:`NttCisNetwork` or ``str`` |
| |
| :keyword network_domain: Filters the node list to nodes in this |
| network domain |
| :type network_domain: :class:`NttCisNetworkDomain` |
| or ``str`` |
| :return: a list of `Node` objects |
| :rtype: ``generator`` of `list` of :class:`Node` |
| |
| """ |
| |
| params = {} |
| if location is not None: |
| params["datacenterId"] = self._location_to_location_id(location) |
| if ipv6 is not None: |
| params["ipv6"] = ipv6 |
| if ipv4 is not None: |
| params["privateIpv4"] = ipv4 |
| if state is not None: |
| params["state"] = state |
| if started is not None: |
| params["started"] = started |
| if deployed is not None: |
| params["deployed"] = deployed |
| if name is not None: |
| params["name"] = name |
| if network_domain is not None: |
| params["networkDomainId"] = self._network_domain_to_network_domain_id(network_domain) |
| if network is not None: |
| params["networkId"] = self._network_to_network_id(network) |
| if vlan is not None: |
| params["vlanId"] = self._vlan_to_vlan_id(vlan) |
| if image is not None: |
| params["sourceImageId"] = self._image_to_image_id(image) |
| |
| nodes_obj = self._list_nodes_single_page(params) |
| yield self._to_nodes(nodes_obj) |
| |
| while nodes_obj.get("pageCount") >= nodes_obj.get("pageSize"): |
| params["pageNumber"] = int(nodes_obj.get("pageNumber")) + 1 |
| nodes_obj = self._list_nodes_single_page(params) |
| yield self._to_nodes(nodes_obj) |
| |
| def ex_edit_metadata(self, node, name=None, description=None, drs_eligible=None): |
| request_elem = ET.Element("editServerMetadata", {"xmlns": TYPES_URN, "id": node.id}) |
| if name is not None: |
| ET.SubElement(request_elem, "name").text = name |
| if description is not None: |
| ET.SubElement(request_elem, "description").text = description |
| if drs_eligible is not None: |
| ET.SubElement(request_elem, "drsEligible").text = drs_eligible |
| body = self.connection.request_with_orgId_api_2( |
| "server/editServerMetadata", method="POST", data=ET.tostring(request_elem) |
| ).object |
| response_code = findtext(body, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_start_node(self, node): |
| # NOTE: This method is here for backward compatibility reasons after |
| # this method was promoted to be part of the standard compute API in |
| # Libcloud v2.7.0 |
| return self.start_node(node=node) |
| |
| def ex_shutdown_graceful(self, node): |
| return self.stop_node(node=node) |
| |
| def ex_power_off(self, node): |
| """ |
| This function will abruptly power-off a server. Unlike |
| ex_shutdown_graceful, success ensures the node will stop but some OS |
| and application configurations may be adversely affected by the |
| equivalent of pulling the power plug out of the machine. |
| |
| :param node: Node which should be used |
| :type node: :class:`Node` |
| |
| :rtype: ``bool`` |
| """ |
| |
| request_elm = ET.Element("powerOffServer", {"xmlns": TYPES_URN, "id": node.id}) |
| |
| try: |
| body = self.connection.request_with_orgId_api_2( |
| "server/powerOffServer", method="POST", data=ET.tostring(request_elm) |
| ).object |
| response_code = findtext(body, "responseCode", TYPES_URN) |
| except (NttCisAPIException, NameError, BaseHTTPError): |
| r = self.ex_get_node_by_id(node.id) |
| response_code = r.state.upper() |
| |
| return response_code in ["IN_PROGRESS", "OK", "STOPPED", "STOPPING"] |
| |
| def ex_reset(self, node): |
| """ |
| This function will abruptly reset a server. Unlike |
| reboot_node, success ensures the node will restart but some OS |
| and application configurations may be adversely affected by the |
| equivalent of pulling the power plug out of the machine. |
| |
| :param node: Node which should be used |
| :type node: :class:`Node` |
| |
| :rtype: ``bool`` |
| """ |
| |
| request_elm = ET.Element("resetServer", {"xmlns": TYPES_URN, "id": node.id}) |
| body = self.connection.request_with_orgId_api_2( |
| "server/resetServer", method="POST", data=ET.tostring(request_elm) |
| ).object |
| response_code = findtext(body, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_update_vm_tools(self, node): |
| """ |
| This function triggers an update of the VMware Tools |
| software running on the guest OS of a Server. |
| |
| :param node: Node which should be used |
| :type node: :class:`Node` |
| |
| :rtype: ``bool`` |
| """ |
| |
| request_elm = ET.Element("updateVmwareTools", {"xmlns": TYPES_URN, "id": node.id}) |
| body = self.connection.request_with_orgId_api_2( |
| "server/updateVmwareTools", method="POST", data=ET.tostring(request_elm) |
| ).object |
| response_code = findtext(body, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_update_node(self, node, name=None, description=None, cpu_count=None, ram_mb=None): |
| """ |
| Update the node, the name, CPU or RAM |
| |
| :param node: Node which should be used |
| :type node: :class:`Node` |
| |
| :param name: The new name (optional) |
| :type name: ``str`` |
| |
| :param description: The new description (optional) |
| :type description: ``str`` |
| |
| :param cpu_count: The new CPU count (optional) |
| :type cpu_count: ``int`` |
| |
| :param ram_mb: The new Memory in MB (optional) |
| :type ram_mb: ``int`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| data = {} |
| if name is not None: |
| data["name"] = name |
| if description is not None: |
| data["description"] = description |
| if cpu_count is not None: |
| data["cpuCount"] = str(cpu_count) |
| if ram_mb is not None: |
| data["memory"] = str(ram_mb) |
| body = self.connection.request_with_orgId_api_1( |
| "server/%s" % (node.id), method="POST", data=urlencode(data, True) |
| ).object |
| response_code = findtext(body, "result", GENERAL_NS) |
| return response_code in ["IN_PROGRESS", "SUCCESS"] |
| |
| def ex_enable_snapshots(self, node, window, plan="ADVANCED", initiate="true"): |
| """ |
| Enable snapshot service on a server |
| |
| :param node: Node ID of the node on which to enable snapshots. |
| :type node: ``str`` |
| |
| :param window: The window id of the window in which the |
| snapshot is enabled. |
| :type name: ``str`` |
| |
| :param plan: Pland type 'ESSENTIALS' or 'ADVANCED |
| :type plan: ``str`` |
| |
| :param initiate: Run a snapshot upon configuration of the |
| snapshot. |
| :type ``str`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| update_node = ET.Element("enableSnapshotService", {"xmlns": TYPES_URN}) |
| window_id = window |
| plan = plan |
| update_node.set("serverId", node) |
| ET.SubElement(update_node, "servicePlan").text = plan |
| ET.SubElement(update_node, "windowId").text = window_id |
| ET.SubElement(update_node, "initiateManualSnapshot").text = initiate |
| result = self.connection.request_with_orgId_api_2( |
| "snapshot/enableSnapshotService", |
| method="POST", |
| data=ET.tostring(update_node), |
| ).object |
| |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def list_snapshots(self, node, page_size=None): |
| """ |
| List snapshots of a server. The list of snapshots can get large. |
| Therefore, page_size is optional to limit this if desired. |
| |
| :param node: Node nameof the node on which to enable snapshots. |
| :type node: ``str`` |
| :param page_size: (Optional) Limit the number of records returned |
| :return snapshots |
| :rtype: ``list`` of `dictionaries` |
| """ |
| |
| params = {} |
| params["serverId"] = self.list_nodes(ex_name=node)[0].id |
| if page_size is not None: |
| params["pageSize"] = page_size |
| |
| return self._to_snapshots( |
| self.connection.request_with_orgId_api_2("snapshot/snapshot", params=params).object |
| ) |
| |
| def get_snapshot(self, snapshot_id): |
| """ |
| Get snapshot of a server by snapshot id. |
| |
| :param snapshot_id: ID of snapshot to retrieve. |
| :type snapshot_id: ``str`` |
| :return a snapshot |
| :rtype: ``dict`` |
| """ |
| |
| return self._to_snapshot( |
| self.connection.request_with_orgId_api_2("snapshot/snapshot/%s" % snapshot_id).object |
| ) |
| |
| def ex_disable_snapshots(self, node): |
| """ |
| Disable snapshots on a server. This also deletes current snapshots. |
| |
| :param node: Node ID of the node on which to enable snapshots. |
| :type node: ``str`` |
| :return True or False |
| :rtype: ``bool`` |
| """ |
| |
| update_node = ET.Element("disableSnapshotService", {"xmlns": TYPES_URN}) |
| |
| update_node.set("serverId", node) |
| result = self.connection.request_with_orgId_api_2( |
| "snapshot/disableSnapshotService", |
| method="POST", |
| data=ET.tostring(update_node), |
| ).object |
| |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_initiate_manual_snapshot(self, name=None, server_id=None): |
| """ |
| Initiate a manual snapshot of server on the fly |
| |
| :param name: optional name of server |
| :type name: ``str`` |
| |
| :param server_id: optinal parameter to use instead of name |
| :type `server_id`str`` |
| |
| :return: True of False |
| :rtype: ``bool`` |
| |
| """ |
| |
| if server_id is None: |
| node = self.list_nodes(ex_name=name) |
| if len(node) > 1: |
| raise RuntimeError( |
| "Found more than one server Id, " |
| "please use one the following along" |
| " with name parameter: {}".format([n.id for n in node]) |
| ) |
| else: |
| node = [] |
| node.append(self.ex_get_node_by_id(server_id)) |
| update_node = ET.Element("initiateManualSnapshot", {"xmlns": TYPES_URN}) |
| update_node.set("serverId", node[0].id) |
| result = self.connection.request_with_orgId_api_2( |
| "snapshot/initiateManualSnapshot", |
| method="POST", |
| data=ET.tostring(update_node), |
| ).object |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_create_snapshot_preview_server( |
| self, |
| snapshot_id, |
| server_name, |
| server_started, |
| nics_connected, |
| server_description=None, |
| target_cluster_id=None, |
| preserve_mac_addresses=None, |
| tag_key_name=None, |
| tag_key_id=None, |
| tag_value=None, |
| ): |
| """ |
| Create a snapshot preview of a server to clone to a new server |
| |
| :param snapshot_id: ID of the specific snahpshot to use in |
| creating preview server. |
| :type snapshot_id: ``str`` |
| |
| :param server_name: Name of the server created from the snapshot |
| :type ``str`` |
| |
| :param nics_connected: 'true' or 'false'. Should the nics be |
| automatically connected |
| :type ``str`` |
| |
| :param server_description: (Optional) A brief description of the |
| server. |
| :type ``str`` |
| |
| :param target_cluster_id: (Optional) The ID of a specific cluster as |
| opposed to the default. |
| :type ``str`` |
| |
| :param preserve_mac_address: (Optional) If set to 'true' will preserve |
| mac address from the original server. |
| :type ``str`` |
| |
| :param tag_key_name: (Optional) If tagging is desired and by name is |
| desired, set this to the tag name. |
| :type ``str`` |
| |
| :param tag_key_id: (Optional) If tagging is desired and by id is |
| desired, set this to the tag id. |
| :type ``str`` |
| |
| :param tag_value: (Optional) If using a tag_key_id or tag_key_name, |
| set the value fo tag_value. |
| |
| :rtype: ``str`` |
| """ |
| |
| create_preview = ET.Element( |
| "createSnapshotPreviewServer", |
| {"xmlns": TYPES_URN, "snapshotId": snapshot_id}, |
| ) |
| ET.SubElement(create_preview, "serverName").text = server_name |
| if server_description is not None: |
| ET.SubElement(create_preview, "serverDescription").text = server_description |
| if target_cluster_id is not None: |
| ET.SubElement(create_preview, "targetClusterId").text = target_cluster_id |
| ET.SubElement(create_preview, "serverStarted").text = server_started |
| ET.SubElement(create_preview, "nicsConnected").text = nics_connected |
| if preserve_mac_addresses is not None: |
| ET.SubElement(create_preview, "preserveMacAddresses").text = preserve_mac_addresses |
| if tag_key_name is not None: |
| tag_elem = ET.SubElement(create_preview, "tag") |
| ET.SubElement(tag_elem, "tagKeyName").text = tag_key_name |
| ET.SubElement(tag_elem, "value").text = tag_value |
| elif tag_key_id is not None: |
| tag_elem = ET.SubElement(create_preview, "tagById") |
| ET.SubElement(create_preview, "tagKeyId").text = tag_key_name |
| ET.SubElement(tag_elem, "value").text = tag_value |
| result = self.connection.request_with_orgId_api_2( |
| "snapshot/createSnapshotPreviewServer", |
| method="POST", |
| data=ET.tostring(create_preview), |
| ).object |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_migrate_preview_server(self, preview_id): |
| migrate_preview = ET.Element( |
| "migrateSnapshotPreviewServer", {"xmlns": TYPES_URN, "serverId": preview_id} |
| ) |
| result = self.connection.request_with_orgId_api_2( |
| "snapshot/migrateSnapshotPreviewServer", |
| method="POST", |
| data=ET.tostring(migrate_preview), |
| ).object |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_create_anti_affinity_rule(self, node_list): |
| """ |
| Edited to work with api 2.x. No longer supports 1.0 |
| Create an anti affinity rule given a list of nodes |
| Anti affinity rules ensure that servers will not reside |
| on the same VMware ESX host |
| |
| :param node_list: The list of nodes to create a rule for |
| :type node_list: ``list`` of :class:`Node` or |
| ``list`` of ``str`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| if not isinstance(node_list, (list, tuple)): |
| raise TypeError("Node list must be a list or a tuple.") |
| anti_affinity_xml_request = ET.Element("createAntiAffinityRule", {"xmlns": TYPES_URN}) |
| for node in node_list: |
| ET.SubElement(anti_affinity_xml_request, "serverId").text = self._node_to_node_id(node) |
| result = self.connection.request_with_orgId_api_2( |
| "server/createAntiAffinityRule", |
| method="POST", |
| data=ET.tostring(anti_affinity_xml_request), |
| ).object |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "SUCCESS"] |
| |
| def ex_delete_anti_affinity_rule(self, anti_affinity_rule): |
| """ |
| Remove anti affinity rule |
| |
| :param anti_affinity_rule: The anti affinity rule to delete |
| :type anti_affinity_rule: :class:`NttCisAntiAffinityRule` or |
| ``str`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| rule_id = anti_affinity_rule |
| |
| update_node = ET.Element("deleteAntiAffinityRule", {"xmlns": TYPES_URN}) |
| update_node.set("id", rule_id) |
| result = self.connection.request_with_orgId_api_2( |
| "server/deleteAntiAffinityRule", |
| method="POST", |
| data=ET.tostring(update_node), |
| ).object |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "SUCCESS"] |
| |
| def ex_list_anti_affinity_rules( |
| self, |
| network=None, |
| network_domain=None, |
| node=None, |
| filter_id=None, |
| filter_state=None, |
| ): |
| """ |
| List anti affinity rules for a network, network domain, or node |
| |
| :param network: The network to list anti affinity rules for |
| One of network, network_domain, or node is required |
| :type network: :class:`NttCisNetwork` or ``str`` |
| |
| :param network_domain: The network domain to list anti affinity rules |
| One of network, network_domain, |
| or node is required |
| :type network_domain: :class:`NttCisNetworkDomain` or ``str`` |
| |
| :param node: The node to list anti affinity rules for |
| One of network, netwok_domain, or node is required |
| :type node: :class:`Node` or ``str`` |
| |
| :param filter_id: This will allow you to filter the rules |
| by this node id |
| :type filter_id: ``str`` |
| |
| :type filter_state: This will allow you to filter rules by |
| node state (i.e. NORMAL) |
| :type filter_state: ``str`` |
| |
| :rtype: ``list`` of :class:NttCisAntiAffinityRule` |
| """ |
| |
| not_none_arguments = [key for key in (network, network_domain, node) if key is not None] |
| if len(not_none_arguments) != 1: |
| raise ValueError("One and ONLY one of network, " "network_domain, or node must be set") |
| |
| params = {} |
| if network_domain is not None: |
| params["networkDomainId"] = self._network_domain_to_network_domain_id(network_domain) |
| if network is not None: |
| params["networkId"] = self._network_to_network_id(network) |
| if node is not None: |
| params["serverId"] = self._node_to_node_id(node) |
| if filter_id is not None: |
| params["id"] = filter_id |
| if filter_state is not None: |
| params["state"] = filter_state |
| |
| paged_result = self.connection.paginated_request_with_orgId_api_2( |
| "server/antiAffinityRule", method="GET", params=params |
| ) |
| |
| rules = [] |
| for result in paged_result: |
| rules.extend(self._to_anti_affinity_rules(result)) |
| return rules |
| |
| def ex_attach_node_to_vlan(self, node, vlan=None, private_ipv4=None): |
| """ |
| Attach a node to a VLAN by adding an additional NIC to |
| the node on the target VLAN. The IP will be automatically |
| assigned based on the VLAN IP network space. Alternatively, provide |
| a private IPv4 address instead of VLAN information, and this will |
| be assigned to the node on corresponding NIC. |
| |
| :param node: Node which should be used |
| :type node: :class:`Node` |
| |
| :param vlan: VLAN to attach the node to |
| (required unless private_ipv4) |
| :type vlan: :class:`NttCisVlan` |
| |
| :keyword private_ipv4: Private nic IPv4 Address |
| (required unless vlan) |
| :type private_ipv4: ``str`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| request = ET.Element("addNic", {"xmlns": TYPES_URN}) |
| ET.SubElement(request, "serverId").text = node.id |
| nic = ET.SubElement(request, "nic") |
| |
| if vlan is not None: |
| ET.SubElement(nic, "vlanId").text = vlan.id |
| elif private_ipv4 is not None: |
| ET.SubElement(nic, "privateIpv4").text = private_ipv4 |
| else: |
| raise ValueError("One of vlan or primary_ipv4 " "must be specified") |
| |
| response = self.connection.request_with_orgId_api_2( |
| "server/addNic", method="POST", data=ET.tostring(request) |
| ).object |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_destroy_nic(self, nic_id): |
| """ |
| Remove a NIC on a node, removing the node from a VLAN |
| |
| :param nic_id: The identifier of the NIC to remove |
| :type nic_id: ``str`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| request = ET.Element("removeNic", {"xmlns": TYPES_URN, "id": nic_id}) |
| |
| response = self.connection.request_with_orgId_api_2( |
| "server/removeNic", method="POST", data=ET.tostring(request) |
| ).object |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_list_networks(self, location=None): |
| """ |
| List networks deployed across all data center locations for your |
| organization. The response includes the location of each network. |
| |
| :param location: The target location |
| :type location: :class:`NodeLocation` or ``str`` |
| |
| :return: a list of NttCisNetwork objects |
| :rtype: ``list`` of :class:`NttCisNetwork` |
| """ |
| |
| return self.list_networks(location=location) |
| |
| def ex_create_network(self, location, name, description=None): |
| """ |
| Create a new network in an MCP 1.0 location |
| |
| :param location: The target location (MCP1) |
| :type location: :class:`NodeLocation` or ``str`` |
| |
| :param name: The name of the network |
| :type name: ``str`` |
| |
| :param description: Additional description of the network |
| :type description: ``str`` |
| |
| :return: A new instance of `NttCisNetwork` |
| :rtype: Instance of :class:`NttCisNetwork` |
| """ |
| |
| network_location = self._location_to_location_id(location) |
| |
| create_node = ET.Element("NewNetworkWithLocation", {"xmlns": NETWORK_NS}) |
| ET.SubElement(create_node, "name").text = name |
| if description is not None: |
| ET.SubElement(create_node, "description").text = description |
| ET.SubElement(create_node, "location").text = network_location |
| |
| self.connection.request_with_orgId_api_1( |
| "networkWithLocation", method="POST", data=ET.tostring(create_node) |
| ) |
| |
| # MCP1 API does not return the ID, but name is unique for location |
| network = list(filter(lambda x: x.name == name, self.ex_list_networks(location)))[0] |
| |
| return network |
| |
| def ex_delete_network(self, network): |
| """ |
| Delete a network from an MCP 1 data center |
| |
| :param network: The network to delete |
| :type network: :class:`NttCisNetwork` |
| |
| :rtype: ``bool`` |
| """ |
| |
| response = self.connection.request_with_orgId_api_1( |
| "network/%s?delete" % network.id, method="GET" |
| ).object |
| response_code = findtext(response, "result", GENERAL_NS) |
| return response_code == "SUCCESS" |
| |
| def ex_rename_network(self, network, new_name): |
| """ |
| Rename a network in MCP 1 data center |
| |
| :param network: The network to rename |
| :type network: :class:`NttCisNetwork` |
| |
| :param new_name: The new name of the network |
| :type new_name: ``str`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| response = self.connection.request_with_orgId_api_1( |
| "network/%s" % network.id, method="POST", data="name=%s" % new_name |
| ).object |
| response_code = findtext(response, "result", GENERAL_NS) |
| return response_code == "SUCCESS" |
| |
| def ex_get_network_domain(self, network_domain_id): |
| """ |
| Get an individual Network Domain, by identifier |
| |
| :param network_domain_id: The identifier of the network domain |
| :type network_domain_id: ``str`` |
| |
| :rtype: :class:`NttCisNetworkDomain` |
| """ |
| |
| locations = self.list_locations() |
| net = self.connection.request_with_orgId_api_2( |
| "network/networkDomain/%s" % network_domain_id |
| ).object |
| return self._to_network_domain(net, locations) |
| |
| def ex_list_network_domains(self, location=None, name=None, service_plan=None, state=None): |
| """ |
| List networks domains deployed across all data center locations domain. |
| |
| for your organization. |
| The response includes the location of each network |
| :param location: Only network domains in the location (optional) |
| :type location: :class:`NodeLocation` or ``str`` |
| |
| :param name: Only network domains of this name (optional) |
| :type name: ``str`` |
| |
| :param service_plan: Only network domains of this type (optional) |
| :type service_plan: ``str`` |
| |
| :param state: Only network domains in this state (optional) |
| :type state: ``str`` |
| |
| :return: a list of `NttCisNetwork` objects |
| :rtype: ``list`` of :class:`NttCisNetwork` |
| """ |
| |
| params = {} |
| if location is not None: |
| params["datacenterId"] = self._location_to_location_id(location) |
| if name is not None: |
| params["name"] = name |
| if service_plan is not None: |
| params["type"] = service_plan |
| if state is not None: |
| params["state"] = state |
| |
| response = self.connection.request_with_orgId_api_2( |
| "network/networkDomain", params=params |
| ).object |
| return self._to_network_domains(response) |
| |
| def ex_create_network_domain(self, location, name, service_plan, description=None): |
| """ |
| Deploy a new network domain to a data center |
| |
| :param location: The data center to list |
| :type location: :class:`NodeLocation` or ``str`` |
| |
| :param name: The name of the network domain to create |
| :type name: ``str`` |
| |
| :param service_plan: The service plan, either "ESSENTIALS" |
| or "ADVANCED" |
| :type service_plan: ``str`` |
| |
| :param description: An additional description of |
| the network domain |
| :type description: ``str`` |
| |
| :return: an instance of `NttCisNetworkDomain` |
| :rtype: :class:`NttCisNetworkDomain` |
| """ |
| |
| create_node = ET.Element("deployNetworkDomain", {"xmlns": TYPES_URN}) |
| ET.SubElement(create_node, "datacenterId").text = self._location_to_location_id(location) |
| |
| ET.SubElement(create_node, "name").text = name |
| if description is not None: |
| ET.SubElement(create_node, "description").text = description |
| ET.SubElement(create_node, "type").text = service_plan |
| |
| response = self.connection.request_with_orgId_api_2( |
| "network/deployNetworkDomain", method="POST", data=ET.tostring(create_node) |
| ).object |
| |
| network_domain_id = None |
| |
| for info in findall(response, "info", TYPES_URN): |
| if info.get("name") == "networkDomainId": |
| network_domain_id = info.get("value") |
| |
| return NttCisNetworkDomain( |
| id=network_domain_id, |
| name=name, |
| description=description, |
| location=location, |
| status=NodeState.RUNNING, |
| plan=service_plan, |
| ) |
| |
| def ex_update_network_domain(self, network_domain): |
| """ |
| Update the properties of a network domain |
| |
| :param network_domain: The network domain with updated properties |
| :type network_domain: :class:`NttCisNetworkDomain` |
| |
| :return: an instance of `NttCisNetworkDomain` |
| :rtype: :class:`NttCisNetworkDomain` |
| """ |
| |
| edit_node = ET.Element("editNetworkDomain", {"xmlns": TYPES_URN}) |
| edit_node.set("id", network_domain.id) |
| ET.SubElement(edit_node, "name").text = network_domain.name |
| if network_domain.description is not None: |
| ET.SubElement(edit_node, "description").text = network_domain.description |
| ET.SubElement(edit_node, "type").text = network_domain.plan |
| |
| self.connection.request_with_orgId_api_2( |
| "network/editNetworkDomain", method="POST", data=ET.tostring(edit_node) |
| ).object |
| |
| return network_domain |
| |
| def ex_delete_network_domain(self, network_domain): |
| """ |
| Delete a network domain |
| |
| :param network_domain: The network domain to delete |
| :type network_domain: :class:`NttCisNetworkDomain` |
| |
| :rtype: ``bool`` |
| """ |
| |
| delete_node = ET.Element("deleteNetworkDomain", {"xmlns": TYPES_URN}) |
| delete_node.set("id", network_domain.id) |
| result = self.connection.request_with_orgId_api_2( |
| "network/deleteNetworkDomain", method="POST", data=ET.tostring(delete_node) |
| ).object |
| |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_create_vlan( |
| self, |
| network_domain, |
| name, |
| private_ipv4_base_address, |
| description=None, |
| private_ipv4_prefix_size=24, |
| ): |
| """ |
| Deploy a new VLAN to a network domain |
| |
| :param network_domain: The network domain to add the VLAN to |
| :type network_domain: :class:`NttCisNetworkDomain` |
| |
| :param name: The name of the VLAN to create |
| :type name: ``str`` |
| |
| :param private_ipv4_base_address: The base IPv4 address |
| e.g. 192.168.1.0 |
| :type private_ipv4_base_address: ``str`` |
| |
| :param description: An additional description of the VLAN |
| :type description: ``str`` |
| |
| :param private_ipv4_prefix_size: The size of the IPv4 |
| address space, e.g 24 |
| :type private_ipv4_prefix_size: ``int`` |
| |
| :return: an instance of `NttCisVlan` |
| :rtype: :class:`NttCisVlan` |
| """ |
| |
| create_node = ET.Element("deployVlan", {"xmlns": TYPES_URN}) |
| ET.SubElement(create_node, "networkDomainId").text = network_domain.id |
| ET.SubElement(create_node, "name").text = name |
| if description is not None: |
| ET.SubElement(create_node, "description").text = description |
| ET.SubElement(create_node, "privateIpv4BaseAddress").text = private_ipv4_base_address |
| ET.SubElement(create_node, "privateIpv4PrefixSize").text = str(private_ipv4_prefix_size) |
| |
| response = self.connection.request_with_orgId_api_2( |
| "network/deployVlan", method="POST", data=ET.tostring(create_node) |
| ).object |
| |
| vlan_id = None |
| |
| for info in findall(response, "info", TYPES_URN): |
| if info.get("name") == "vlanId": |
| vlan_id = info.get("value") |
| |
| return self.ex_get_vlan(vlan_id) |
| |
| def ex_get_vlan(self, vlan_id): |
| """ |
| Get a single VLAN, by it's identifier |
| |
| :param vlan_id: The identifier of the VLAN |
| :type vlan_id: ``str`` |
| |
| :return: an instance of `NttCisVlan` |
| :rtype: :class:`NttCisVlan` |
| """ |
| |
| locations = self.list_locations() |
| vlan = self.connection.request_with_orgId_api_2("network/vlan/%s" % vlan_id).object |
| return self._to_vlan(vlan, locations) |
| |
| def ex_update_vlan(self, vlan): |
| """ |
| Updates the properties of the given VLAN |
| Only name and description are updated |
| |
| :param vlan: The VLAN to update |
| :type vlan: :class:`NttCisetworkDomain` |
| |
| :return: an instance of `NttCisVlan` |
| :rtype: :class:`NttCisVlan` |
| """ |
| |
| edit_node = ET.Element("editVlan", {"xmlns": TYPES_URN}) |
| edit_node.set("id", vlan.id) |
| ET.SubElement(edit_node, "name").text = vlan.name |
| if vlan.description is not None: |
| ET.SubElement(edit_node, "description").text = vlan.description |
| |
| self.connection.request_with_orgId_api_2( |
| "network/editVlan", method="POST", data=ET.tostring(edit_node) |
| ).object |
| |
| return vlan |
| |
| def ex_expand_vlan(self, vlan): |
| """ |
| Expands the VLAN to the prefix size in private_ipv4_range_size |
| The expansion will |
| not be permitted if the proposed IP space overlaps with an |
| already deployed VLANs IP space. |
| |
| :param vlan: The VLAN to update |
| :type vlan: :class:`NttCisNetworkDomain` |
| |
| :return: an instance of `NttCisVlan` |
| :rtype: :class:`NttCisVlan` |
| """ |
| |
| edit_node = ET.Element("expandVlan", {"xmlns": TYPES_URN}) |
| edit_node.set("id", vlan.id) |
| ET.SubElement(edit_node, "privateIpv4PrefixSize").text = vlan.private_ipv4_range_size |
| |
| self.connection.request_with_orgId_api_2( |
| "network/expandVlan", method="POST", data=ET.tostring(edit_node) |
| ).object |
| |
| return vlan |
| |
| def ex_delete_vlan(self, vlan): |
| """ |
| Deletes an existing VLAN |
| |
| :param vlan: The VLAN to delete |
| :type vlan: :class:`DNttCisNetworkDomain` |
| |
| :rtype: ``bool`` |
| """ |
| |
| delete_node = ET.Element("deleteVlan", {"xmlns": TYPES_URN}) |
| delete_node.set("id", vlan.id) |
| result = self.connection.request_with_orgId_api_2( |
| "network/deleteVlan", method="POST", data=ET.tostring(delete_node) |
| ).object |
| |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_list_vlans( |
| self, |
| location=None, |
| network_domain=None, |
| name=None, |
| ipv4_address=None, |
| ipv6_address=None, |
| state=None, |
| ): |
| """ |
| List VLANs available, can filter by location and/or network domain |
| |
| :param location: Only VLANs in this location (optional) |
| :type location: :class:`NodeLocation` or ``str`` |
| |
| :param network_domain: Only VLANs in this domain (optional) |
| :type network_domain: :class:`NttCisNetworkDomain` |
| |
| :param name: Only VLANs with this name (optional) |
| :type name: ``str`` |
| |
| :param ipv4_address: Only VLANs with this ipv4 address (optional) |
| :type ipv4_address: ``str`` |
| |
| :param ipv6_address: Only VLANs with this ipv6 address (optional) |
| :type ipv6_address: ``str`` |
| |
| :param state: Only VLANs with this state (optional) |
| :type state: ``str`` |
| |
| :return: a list of NttCisVlan objects |
| :rtype: ``list`` of :class:`NttCisVlan` |
| """ |
| |
| params = {} |
| if location is not None: |
| params["datacenterId"] = self._location_to_location_id(location) |
| if network_domain is not None: |
| params["networkDomainId"] = self._network_domain_to_network_domain_id(network_domain) |
| if name is not None: |
| params["name"] = name |
| if ipv4_address is not None: |
| params["privateIpv4Address"] = ipv4_address |
| if ipv6_address is not None: |
| params["ipv6Address"] = ipv6_address |
| if state is not None: |
| params["state"] = state |
| response = self.connection.request_with_orgId_api_2("network/vlan", params=params).object |
| return self._to_vlans(response) |
| |
| def ex_add_public_ip_block_to_network_domain(self, network_domain): |
| add_node = ET.Element("addPublicIpBlock", {"xmlns": TYPES_URN}) |
| ET.SubElement(add_node, "networkDomainId").text = network_domain.id |
| |
| response = self.connection.request_with_orgId_api_2( |
| "network/addPublicIpBlock", method="POST", data=ET.tostring(add_node) |
| ).object |
| |
| block_id = None |
| |
| for info in findall(response, "info", TYPES_URN): |
| if info.get("name") == "ipBlockId": |
| block_id = info.get("value") |
| return self.ex_get_public_ip_block(block_id) |
| |
| def ex_list_public_ip_blocks(self, network_domain): |
| params = {} |
| params["networkDomainId"] = network_domain.id |
| |
| response = self.connection.request_with_orgId_api_2( |
| "network/publicIpBlock", params=params |
| ).object |
| return self._to_ip_blocks(response) |
| |
| def ex_get_public_ip_block(self, block_id): |
| locations = self.list_locations() |
| block = self.connection.request_with_orgId_api_2( |
| "network/publicIpBlock/%s" % block_id |
| ).object |
| return self._to_ip_block(block, locations) |
| |
| def ex_delete_public_ip_block(self, block): |
| delete_node = ET.Element("removePublicIpBlock", {"xmlns": TYPES_URN}) |
| delete_node.set("id", block.id) |
| result = self.connection.request_with_orgId_api_2( |
| "network/removePublicIpBlock", method="POST", data=ET.tostring(delete_node) |
| ).object |
| |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| # 09/10/18 Adding private IPv4 and IPv6 addressing capability |
| def ex_reserve_ip(self, vlan, ip, description): |
| vlan_id = self._vlan_to_vlan_id(vlan) |
| if re.match(r"(\d+\.){3}", ip): |
| private_ip = ET.Element("reservePrivateIpv4Address", {"xmlns": TYPES_URN}) |
| resource = "network/reservePrivateIpv4Address" |
| elif re.search(r":", ip): |
| private_ip = ET.Element("reserveIpv6Address", {"xmlns": TYPES_URN}) |
| resource = "network/reserveIpv6Address" |
| ET.SubElement(private_ip, "vlanId").text = vlan_id |
| ET.SubElement(private_ip, "ipAddress").text = ip |
| if description is not None: |
| ET.SubElement(private_ip, "description").text = description |
| |
| result = self.connection.request_with_orgId_api_2( |
| resource, method="POST", data=ET.tostring(private_ip) |
| ).object |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_unreserve_ip_addresses(self, vlan, ip): |
| vlan_id = self._vlan_to_vlan_id(vlan) |
| if re.match(r"(\d+\.){3}", ip): |
| private_ip = ET.Element("unreservePrivateIpv4Address", {"xmlns": TYPES_URN}) |
| resource = "network/reservePrivateIpv4Address" |
| elif re.search(r":", ip): |
| private_ip = ET.Element("unreserveIpv6Address", {"xmlns": TYPES_URN}) |
| resource = "network/unreserveIpv6Address" |
| ET.SubElement(private_ip, "vlanId").text = vlan_id |
| ET.SubElement(private_ip, "ipAddress").text = ip |
| result = self.connection.request_with_orgId_api_2( |
| resource, method="POST", data=ET.tostring(private_ip) |
| ).object |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_list_reserved_ipv4(self, vlan=None, datacenter_id=None): |
| if vlan is not None: |
| vlan_id = self._vlan_to_vlan_id(vlan) |
| params = {"vlanId": vlan_id} |
| response = self.connection.request_with_orgId_api_2( |
| "network/reservedPrivateIpv4Address", params=params |
| ).object |
| |
| elif datacenter_id is not None: |
| params = {"datacenterId": datacenter_id} |
| response = self.connection.request_with_orgId_api_2( |
| "network/reservedPrivateIpv4Address", params=params |
| ).object |
| else: |
| response = self.connection.request_with_orgId_api_2( |
| "network/reservedPrivateIpv4Address" |
| ).object |
| addresses = self._to_ipv4_addresses(response) |
| return addresses |
| |
| def ex_list_reserved_ipv6(self, vlan=None, datacenter_id=None): |
| if vlan is not None: |
| vlan_id = self._vlan_to_vlan_id(vlan) |
| params = {"vlanId": vlan_id} |
| response = self.connection.request_with_orgId_api_2( |
| "network/reservedIpv6Address", params=params |
| ).object |
| |
| elif datacenter_id is not None: |
| params = {"datacenterId": datacenter_id} |
| response = self.connection.request_with_orgId_api_2( |
| "network/reservedIpv6Address", params=params |
| ).object |
| else: |
| response = self.connection.request_with_orgId_api_2( |
| "network/reservedIpv6Address" |
| ).object |
| addresses = self._to_ipv6_addresses(response) |
| return addresses |
| |
| def ex_get_node_by_id(self, id): |
| node = self.connection.request_with_orgId_api_2("server/server/%s" % id).object |
| return self._to_node(node) |
| |
| def ex_list_firewall_rules(self, network_domain, page_size=50, page_number=1): |
| params = {"pageSize": page_size, "pageNumber": page_number} |
| params["networkDomainId"] = self._network_domain_to_network_domain_id(network_domain) |
| |
| response = self.connection.request_with_orgId_api_2( |
| "network/firewallRule", params=params |
| ).object |
| return self._to_firewall_rules(response, network_domain) |
| |
| def ex_create_firewall_rule( |
| self, |
| network_domain, |
| name, |
| action, |
| ip_version, |
| protocol, |
| source_addr, |
| dest_addr, |
| position, |
| enabled=1, |
| position_relative_to_rule=None, |
| ): |
| """ |
| Creates a firewall rule |
| |
| :param network_domain: The network domain in which to create |
| the firewall rule |
| :type network_domain: :class:`NttCisNetworkDomain` or ``str`` |
| |
| :param name: The rule's name |
| :type name: ``str`` |
| |
| :param action: 'ACCEPT_DECISIVELY' or 'DROP' |
| :type action: ``str`` |
| |
| :param ip_version: 'IPV4' or 'IPV6' |
| :type ip_version: ``str`` |
| |
| :param protocol: One of 'IP', 'ICMP', 'TCP', or 'UDP' |
| :type protocol: ``str`` |
| |
| :param source_addr: The source address, which must be an |
| NttCisFirewallAddress instance |
| :type source_addr: ``NttCisFirewallAddress`` |
| |
| :param dest_addr: The destination address, which must be an |
| NttCisFirewallAddress instance |
| :type dest_addr: `NttCisFirewallAddress`` |
| |
| :param position: The position in which to create the rule |
| There are two types of positions |
| with position_relative_to_rule arg and without it |
| With: 'BEFORE' or 'AFTER' |
| Without: 'FIRST' or 'LAST' |
| :type position: ``str`` |
| |
| :param enabled: Firewall rule is enabled upon creation. |
| Set to 1 for true or 0 for false. |
| :type enabled: ``int`` |
| |
| :param position_relative_to_rule: The rule or rule name in |
| which to decide positioning by |
| :type position_relative_to_rule: |
| ::class:`NttCisFirewallRule` or ``str`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| positions_without_rule = ("FIRST", "LAST") |
| positions_with_rule = ("BEFORE", "AFTER") |
| |
| create_node = ET.Element("createFirewallRule", {"xmlns": TYPES_URN}) |
| ET.SubElement( |
| create_node, "networkDomainId" |
| ).text = self._network_domain_to_network_domain_id(network_domain) |
| ET.SubElement(create_node, "name").text = name |
| ET.SubElement(create_node, "action").text = action |
| ET.SubElement(create_node, "ipVersion").text = ip_version |
| ET.SubElement(create_node, "protocol").text = protocol |
| # Setup source port rule |
| source = ET.SubElement(create_node, "source") |
| if source_addr.address_list_id is not None: |
| source_ip = ET.SubElement(source, "ipAddressListId") |
| source_ip.text = source_addr.address_list_id |
| else: |
| source_ip = ET.SubElement(source, "ip") |
| if source_addr.any_ip: |
| source_ip.set("address", "ANY") |
| else: |
| source_ip.set("address", source_addr.ip_address) |
| if source_addr.ip_prefix_size is not None: |
| source_ip.set("prefixSize", str(source_addr.ip_prefix_size)) |
| if source_addr.port_list_id is not None: |
| source_port = ET.SubElement(source, "portListId") |
| source_port.text = source_addr.port_list_id |
| else: |
| if source_addr.port_begin is not None: |
| source_port = ET.SubElement(source, "port") |
| source_port.set("begin", source_addr.port_begin) |
| if source_addr.port_end is not None: |
| source_port.set("end", source_addr.port_end) |
| # Setup destination port rule |
| dest = ET.SubElement(create_node, "destination") |
| if dest_addr.address_list_id is not None: |
| dest_ip = ET.SubElement(dest, "ipAddressListId") |
| dest_ip.text = dest_addr.address_list_id |
| else: |
| dest_ip = ET.SubElement(dest, "ip") |
| if dest_addr.any_ip: |
| dest_ip.set("address", "ANY") |
| else: |
| dest_ip.set("address", dest_addr.ip_address) |
| if dest_addr.ip_prefix_size is not None: |
| dest_ip.set("prefixSize", dest_addr.ip_prefix_size) |
| if dest_addr.port_list_id is not None: |
| dest_port = ET.SubElement(dest, "portListId") |
| dest_port.text = dest_addr.port_list_id |
| else: |
| if dest_addr.port_begin is not None: |
| dest_port = ET.SubElement(dest, "port") |
| dest_port.set("begin", dest_addr.port_begin) |
| if dest_addr.port_end is not None: |
| dest_port.set("end", dest_addr.port_end) |
| # Set up positioning of rule |
| ET.SubElement(create_node, "enabled").text = str(enabled) |
| placement = ET.SubElement(create_node, "placement") |
| if position_relative_to_rule is not None: |
| if position not in positions_with_rule: |
| raise ValueError( |
| "When position_relative_to_rule is specified" |
| " position must be %s" % ", ".join(positions_with_rule) |
| ) |
| if isinstance(position_relative_to_rule, NttCisFirewallRule): |
| rule_name = position_relative_to_rule.name |
| else: |
| rule_name = position_relative_to_rule |
| placement.set("relativeToRule", rule_name) |
| else: |
| if position not in positions_without_rule: |
| raise ValueError( |
| "When position_relative_to_rule is not" |
| " specified position must be %s" % ", ".join(positions_without_rule) |
| ) |
| placement.set("position", position) |
| |
| response = self.connection.request_with_orgId_api_2( |
| "network/createFirewallRule", method="POST", data=ET.tostring(create_node) |
| ).object |
| |
| rule_id = None |
| for info in findall(response, "info", TYPES_URN): |
| if info.get("name") == "firewallRuleId": |
| rule_id = info.get("value") |
| rule = self.ex_get_firewall_rule(network_domain, rule_id) |
| return rule |
| |
| def ex_edit_firewall_rule(self, rule, position=None, relative_rule_for_position=None): |
| """ |
| Edit a firewall rule |
| |
| >>> from pprint import pprint |
| >>> from libcloud.compute.types import Provider |
| >>> from libcloud.compute.providers import get_driver |
| >>> import libcloud.security |
| >>> |
| >>> # Get NTTC-CIS driver |
| >>> libcloud.security.VERIFY_SSL_CERT = True |
| >>> cls = get_driver(Provider.NTTCIS) |
| >>> driver = cls('myusername','mypassword', region='dd-au') |
| >>> |
| >>> # Get location |
| >>> location = driver.ex_get_location_by_id(id='AU9') |
| >>> |
| >>> # Get network domain by location |
| >>> networkDomainName = "Baas QA" |
| >>> network_domains = driver.ex_list_network_domains(location=location) |
| >>> my_network_domain = [d for d in network_domains if d.name == |
| networkDomainName][0] |
| >>> |
| >>> |
| >>> # List firewall rules |
| >>> firewall_rules = driver.ex_list_firewall_rules(my_network_domain) |
| >>> |
| >>> # Get Firewall Rule by name |
| >>> pprint("List specific firewall rule by name") |
| >>> fire_rule_under_test = (list(filter(lambda x: x.name == |
| 'My_New_Firewall_Rule', firewall_rules))[0]) |
| >>> pprint(fire_rule_under_test.source) |
| >>> pprint(fire_rule_under_test.destination) |
| >>> |
| >>> # Edit Firewall |
| >>> fire_rule_under_test.destination.address_list_id = |
| '5e7c323f-c885-4e4b-9a27-94c44217dbd3' |
| >>> fire_rule_under_test.destination.port_list_id = |
| 'b6557c5a-45fa-4138-89bd-8fe68392691b' |
| >>> result = driver.ex_edit_firewall_rule(fire_rule_under_test, 'LAST') |
| >>> pprint(result) |
| |
| :param rule: (required) The rule in which to create |
| :type rule: :class:`DNttCisFirewallRule` |
| |
| :param position: (required) There are two types of positions |
| with position_relative_to_rule arg and without it |
| With: 'BEFORE' or 'AFTER' |
| Without: 'FIRST' or 'LAST' |
| :type position: ``str`` |
| |
| :param relative_rule_for_position: (optional) The rule or rule name in |
| which to decide the relative rule |
| for positioning. |
| :type relative_rule_for_position: |
| :class:`NttCisFirewallRule` or ``str`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| positions_without_rule = ("FIRST", "LAST") |
| positions_with_rule = ("BEFORE", "AFTER") |
| |
| edit_node = ET.Element("editFirewallRule", {"xmlns": TYPES_URN, "id": rule.id}) |
| ET.SubElement(edit_node, "action").text = rule.action |
| ET.SubElement(edit_node, "protocol").text = rule.protocol |
| |
| # Source address |
| source = ET.SubElement(edit_node, "source") |
| if rule.source.address_list_id is not None: |
| source_ip = ET.SubElement(source, "ipAddressListId") |
| source_ip.text = rule.source.address_list_id |
| else: |
| source_ip = ET.SubElement(source, "ip") |
| if rule.source.any_ip: |
| source_ip.set("address", "ANY") |
| else: |
| source_ip.set("address", rule.source.ip_address) |
| if rule.source.ip_prefix_size is not None: |
| source_ip.set("prefixSize", str(rule.source.ip_prefix_size)) |
| |
| # Setup source port rule |
| if rule.source.port_list_id is not None: |
| source_port = ET.SubElement(source, "portListId") |
| source_port.text = rule.source.port_list_id |
| else: |
| if rule.source.port_begin is not None: |
| source_port = ET.SubElement(source, "port") |
| source_port.set("begin", rule.source.port_begin) |
| if rule.source.port_end is not None: |
| source_port.set("end", rule.source.port_end) |
| # Setup destination port rule |
| dest = ET.SubElement(edit_node, "destination") |
| if rule.destination.address_list_id is not None: |
| dest_ip = ET.SubElement(dest, "ipAddressListId") |
| dest_ip.text = rule.destination.address_list_id |
| else: |
| dest_ip = ET.SubElement(dest, "ip") |
| if rule.destination.any_ip: |
| dest_ip.set("address", "ANY") |
| else: |
| dest_ip.set("address", rule.destination.ip_address) |
| if rule.destination.ip_prefix_size is not None: |
| dest_ip.set("prefixSize", rule.destination.ip_prefix_size) |
| if rule.destination.port_list_id is not None: |
| dest_port = ET.SubElement(dest, "portListId") |
| dest_port.text = rule.destination.port_list_id |
| else: |
| if rule.destination.port_begin is not None: |
| dest_port = ET.SubElement(dest, "port") |
| dest_port.set("begin", rule.destination.port_begin) |
| if rule.destination.port_end is not None: |
| dest_port.set("end", rule.destination.port_end) |
| # Set up positioning of rule |
| ET.SubElement(edit_node, "enabled").text = str(rule.enabled).lower() |
| # changing placement to an option |
| if position is not None: |
| placement = ET.SubElement(edit_node, "placement") |
| if relative_rule_for_position is not None: |
| if position not in positions_with_rule: |
| raise ValueError( |
| "When position_relative_to_rule is specified" |
| " position must be %s" % ", ".join(positions_with_rule) |
| ) |
| if isinstance(relative_rule_for_position, NttCisFirewallRule): |
| rule_name = relative_rule_for_position.name |
| else: |
| rule_name = relative_rule_for_position |
| placement.set("relativeToRule", rule_name) |
| else: |
| if position not in positions_without_rule: |
| raise ValueError( |
| "When position_relative_to_rule is not" |
| " specified position must be %s" % ", ".join(positions_without_rule) |
| ) |
| placement.set("position", position) |
| |
| response = self.connection.request_with_orgId_api_2( |
| "network/editFirewallRule", method="POST", data=ET.tostring(edit_node) |
| ).object |
| |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_get_firewall_rule(self, network_domain, rule_id): |
| locations = self.list_locations() |
| rule = self.connection.request_with_orgId_api_2("network/firewallRule/%s" % rule_id).object |
| return self._to_firewall_rule(rule, locations, network_domain) |
| |
| def ex_set_firewall_rule_state(self, rule, state): |
| """ |
| Change the state (enabled or disabled) of a rule |
| |
| :param rule: The rule to delete |
| :type rule: :class:`NttCisFirewallRule` |
| |
| :param state: The desired state enabled (True) or disabled (False) |
| :type state: ``bool`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| update_node = ET.Element("editFirewallRule", {"xmlns": TYPES_URN}) |
| update_node.set("id", rule.id) |
| ET.SubElement(update_node, "enabled").text = str(state).lower() |
| result = self.connection.request_with_orgId_api_2( |
| "network/editFirewallRule", method="POST", data=ET.tostring(update_node) |
| ).object |
| |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_delete_firewall_rule(self, rule): |
| """ |
| Delete a firewall rule |
| |
| :param rule: The rule to delete |
| :type rule: :class:`NttCisFirewallRule` |
| |
| :rtype: ``bool`` |
| """ |
| |
| update_node = ET.Element("deleteFirewallRule", {"xmlns": TYPES_URN}) |
| update_node.set("id", rule.id) |
| result = self.connection.request_with_orgId_api_2( |
| "network/deleteFirewallRule", method="POST", data=ET.tostring(update_node) |
| ).object |
| |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_create_nat_rule(self, network_domain, internal_ip, external_ip): |
| """ |
| Create a NAT rule |
| |
| :param network_domain: The network domain the rule belongs to |
| :type network_domain: :class:`NttCisNetworkDomain` |
| |
| :param internal_ip: The IPv4 address internally |
| :type internal_ip: ``str`` |
| |
| :param external_ip: The IPv4 address externally |
| :type external_ip: ``str`` |
| |
| :rtype: :class:`NttCisNatRule` |
| """ |
| |
| create_node = ET.Element("createNatRule", {"xmlns": TYPES_URN}) |
| ET.SubElement(create_node, "networkDomainId").text = network_domain.id |
| ET.SubElement(create_node, "internalIp").text = internal_ip |
| ET.SubElement(create_node, "externalIp").text = external_ip |
| result = self.connection.request_with_orgId_api_2( |
| "network/createNatRule", method="POST", data=ET.tostring(create_node) |
| ).object |
| |
| rule_id = None |
| for info in findall(result, "info", TYPES_URN): |
| if info.get("name") == "natRuleId": |
| rule_id = info.get("value") |
| |
| return NttCisNatRule( |
| id=rule_id, |
| network_domain=network_domain, |
| internal_ip=internal_ip, |
| external_ip=external_ip, |
| status=NodeState.RUNNING, |
| ) |
| |
| def ex_list_nat_rules(self, network_domain): |
| """ |
| Get NAT rules for the network domain |
| |
| :param network_domain: The network domain the rules belongs to |
| :type network_domain: :class:`NttCisNetworkDomain` |
| |
| :rtype: ``list`` of :class:`NttCisNatRule` |
| """ |
| |
| params = {} |
| params["networkDomainId"] = network_domain.id |
| |
| response = self.connection.request_with_orgId_api_2("network/natRule", params=params).object |
| return self._to_nat_rules(response, network_domain) |
| |
| def ex_get_nat_rule(self, network_domain, rule_id): |
| """ |
| Get a NAT rule by ID |
| |
| :param network_domain: The network domain the rule belongs to |
| :type network_domain: :class:`NttCisNetworkDomain` |
| |
| :param rule_id: The ID of the NAT rule to fetch |
| :type rule_id: ``str`` |
| |
| :rtype: :class:`NttCisNatRule` |
| """ |
| |
| rule = self.connection.request_with_orgId_api_2("network/natRule/%s" % rule_id).object |
| return self._to_nat_rule(rule, network_domain) |
| |
| def ex_delete_nat_rule(self, rule): |
| """ |
| Delete an existing NAT rule |
| |
| :param rule: The rule to delete |
| :type rule: :class:`NttCisNatRule` |
| |
| :rtype: ``bool`` |
| """ |
| |
| update_node = ET.Element("deleteNatRule", {"xmlns": TYPES_URN}) |
| update_node.set("id", rule.id) |
| result = self.connection.request_with_orgId_api_2( |
| "network/deleteNatRule", method="POST", data=ET.tostring(update_node) |
| ).object |
| |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_get_location_by_id(self, id): |
| """ |
| Get location by ID. |
| |
| :param id: ID of the node location which should be used |
| :type id: ``str`` |
| |
| :rtype: :class:`NodeLocation` |
| """ |
| |
| location = None |
| if id is not None: |
| location = self.list_locations(ex_id=id)[0] |
| return location |
| |
| def ex_wait_for_state(self, state, func, poll_interval=2, timeout=60, *args, **kwargs): |
| """ |
| Wait for the function which returns a instance |
| with field status to match |
| |
| Keep polling func until one of the desired states is matched |
| |
| :param state: Either the desired state (`str`) or a `list` of states |
| :type state: ``str`` or ``list`` |
| |
| :param func: The function to call, e.g. ex_get_vlan |
| :type func: ``function`` |
| |
| :param poll_interval: The number of seconds to wait between checks |
| :type poll_interval: `int` |
| |
| :param timeout: The total number of seconds to wait to reach a state |
| :type timeout: `int` |
| |
| :param args: The arguments for func |
| :type args: Positional arguments |
| |
| :param kwargs: The arguments for func |
| :type kwargs: Keyword arguments |
| """ |
| |
| return self.connection.wait_for_state(state, func, poll_interval, timeout, *args, **kwargs) |
| |
| def ex_enable_monitoring(self, node, service_plan="ESSENTIALS"): |
| """ |
| Enables cloud monitoring on a node |
| |
| :param node: The node to monitor |
| :type node: :class:`Node` |
| |
| :param service_plan: The service plan, one of ESSENTIALS or |
| ADVANCED |
| :type service_plan: ``str`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| update_node = ET.Element("enableServerMonitoring", {"xmlns": TYPES_URN}) |
| update_node.set("id", node.id) |
| ET.SubElement(update_node, "servicePlan").text = service_plan |
| result = self.connection.request_with_orgId_api_2( |
| "server/enableServerMonitoring", |
| method="POST", |
| data=ET.tostring(update_node), |
| ).object |
| |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_update_monitoring_plan(self, node, service_plan="ESSENTIALS"): |
| """ |
| Updates the service plan on a node with monitoring |
| |
| :param node: The node to monitor |
| :type node: :class:`Node` |
| |
| :param service_plan: The service plan, one of ESSENTIALS or |
| ADVANCED |
| :type service_plan: ``str`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| update_node = ET.Element("changeServerMonitoringPlan", {"xmlns": TYPES_URN}) |
| update_node.set("id", node.id) |
| ET.SubElement(update_node, "servicePlan").text = service_plan |
| result = self.connection.request_with_orgId_api_2( |
| "server/changeServerMonitoringPlan", |
| method="POST", |
| data=ET.tostring(update_node), |
| ).object |
| |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_disable_monitoring(self, node): |
| """ |
| Disables cloud monitoring for a node |
| |
| :param node: The node to stop monitoring |
| :type node: :class:`Node` |
| |
| :rtype: ``bool`` |
| """ |
| |
| update_node = ET.Element("disableServerMonitoring", {"xmlns": TYPES_URN}) |
| update_node.set("id", node.id) |
| result = self.connection.request_with_orgId_api_2( |
| "server/disableServerMonitoring", |
| method="POST", |
| data=ET.tostring(update_node), |
| ).object |
| |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_add_scsi_controller_to_node(self, server_id, adapter_type, bus_number=None): |
| """ |
| Added 8/27/18: Adds a SCSI Controller by node id |
| :param server_id: server id |
| :param adapter_type: the type of SCSI Adapter, i.e., LSI_LOGIC_PARALLEL |
| :param bus_number: optional number of server's bus |
| :return: whether addition is in progress or 'OK' otherwise false |
| """ |
| |
| update_node = ET.Element("addScsiController", {"xmlns": TYPES_URN}) |
| ET.SubElement(update_node, "serverId").text = server_id |
| ET.SubElement(update_node, "adapterType").text = adapter_type |
| if bus_number is not None: |
| ET.SubElement(update_node, "busNumber").text = bus_number |
| |
| result = self.connection.request_with_orgId_api_2( |
| "server/addScsiController", method="POST", data=ET.tostring(update_node) |
| ).object |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_remove_scsi_controller(self, controller_id): |
| """ |
| Added 8/27/18: Adds a SCSI Controller by node id |
| :param controller_id: Scsi controller's id |
| :return: whether addition is in progress or 'OK' otherwise false |
| """ |
| |
| update_node = ET.Element("removeScsiController", {"xmlns": TYPES_URN}) |
| update_node.set("id", controller_id) |
| result = self.connection.request_with_orgId_api_2( |
| "server/removeScsiController", method="POST", data=ET.tostring(update_node) |
| ).object |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_add_storage_to_node( |
| self, amount, node=None, speed="STANDARD", controller_id=None, scsi_id=None |
| ): |
| """ |
| Updated 8/23/18 |
| Add storage to the node |
| One of node or controller_id must be selected |
| |
| :param node: The server to add storage to (required if |
| controller_id is not used |
| :type node: :class:`Node` |
| |
| :param amount: The amount of storage to add, in GB |
| :type amount: ``int`` |
| |
| :param speed: The disk speed type |
| :type speed: ``str`` |
| |
| :param conrollter_id: The disk may be added using the |
| cotnroller id (required if node |
| object is not used) |
| :type controller_id: ``str`` |
| |
| :param scsi_id: The target SCSI ID (optional) |
| :type scsi_id: ``int`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| if (node is None and controller_id is None) or ( |
| node is not None and controller_id is not None |
| ): |
| raise RuntimeError("Either a node or a controller " "id must be specified") |
| |
| update_node = ET.Element("addDisk", {"xmlns": TYPES_URN}) |
| if node is not None: |
| ET.SubElement(update_node, "serverId").text = node.id |
| elif controller_id is not None: |
| scsi_node = ET.Element("scsiController") |
| ET.SubElement(scsi_node, "controllerId").text = controller_id |
| update_node.insert(1, scsi_node) |
| ET.SubElement(update_node, "sizeGb").text = str(amount) |
| ET.SubElement(update_node, "speed").text = speed.upper() |
| if scsi_id is not None: |
| ET.SubElement(update_node, "scsiId").text = str(scsi_id) |
| |
| result = self.connection.request_with_orgId_api_2( |
| "server/addDisk", method="POST", data=ET.tostring(update_node) |
| ).object |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_remove_storage_from_node(self, node, scsi_id): |
| """ |
| Remove storage from a node |
| |
| :param node: The server to add storage to |
| :type node: :class:`Node` |
| |
| :param scsi_id: The ID of the disk to remove |
| :type scsi_id: ``str`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| disk = [disk for disk in node.extra["disks"] if disk.scsi_id == scsi_id][0] |
| return self.ex_remove_storage(disk.id) |
| |
| def ex_remove_storage(self, disk_id): |
| """ |
| Remove storage from a node |
| |
| :param node: The server to add storage to |
| :type node: :class:`Node` |
| |
| :param disk_id: The ID of the disk to remove |
| :type disk_id: ``str`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| remove_disk = ET.Element("removeDisk", {"xmlns": TYPES_URN}) |
| remove_disk.set("id", disk_id) |
| result = self.connection.request_with_orgId_api_2( |
| "server/removeDisk", method="POST", data=ET.tostring(remove_disk) |
| ).object |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_change_storage_speed(self, disk_id, speed, iops=None): |
| """ |
| Change the speed (disk tier) of a disk |
| |
| :param node: The server to change the disk speed of |
| :type node: :class:`Node` |
| |
| :param disk_id: The ID of the disk to change |
| :type disk_id: ``str`` |
| |
| :param speed: The disk speed type e.g. STANDARD |
| :type speed: ``str`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| create_node = ET.Element("changeDiskSpeed", {"xmlns": TYPES_URN}) |
| create_node.set("id", disk_id) |
| ET.SubElement(create_node, "speed").text = speed |
| if iops is not None: |
| ET.SubElement(create_node, "iops").text = str(iops) |
| result = self.connection.request_with_orgId_api_2( |
| "server/changeDiskSpeed", method="POST", data=ET.tostring(create_node) |
| ).object |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "SUCCESS"] |
| |
| def ex_change_storage_size(self, disk_id, size): |
| """ |
| Change the size of a disk |
| |
| :param node: The server to change the disk of |
| :type node: :class:`Node` |
| |
| :param disk_id: The ID of the disk to resize |
| :type disk_id: ``str`` |
| |
| :param size: The disk size in GB |
| :type size: ``int`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| create_node = ET.Element("expandDisk", {"xmlns": TYPES_URN, "id": disk_id}) |
| ET.SubElement(create_node, "newSizeGb").text = str(size) |
| """ |
| This code is for version 1 of MCP, need version 2 |
| result = self.connection.request_with_orgId_api_1( |
| 'server/%s/disk/%s/changeSize' % |
| (node.id, disk_id), |
| method='POST', |
| data=ET.tostring(create_node)).object |
| """ |
| result = self.connection.request_with_orgId_api_2( |
| "server/expandDisk", method="POST", data=ET.tostring(create_node) |
| ).object |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_reconfigure_node( |
| self, |
| node, |
| memory_gb=None, |
| cpu_count=None, |
| cores_per_socket=None, |
| cpu_performance=None, |
| ): |
| """ |
| Reconfigure the virtual hardware specification of a node |
| |
| :param node: The server to change |
| :type node: :class:`Node` |
| |
| :param memory_gb: The amount of memory in GB (optional) |
| :type memory_gb: ``int`` |
| |
| :param cpu_count: The number of CPU (optional) |
| :type cpu_count: ``int`` |
| |
| :param cores_per_socket: Number of CPU cores per socket (optional) |
| :type cores_per_socket: ``int`` |
| |
| :param cpu_performance: CPU Performance type (optional) |
| :type cpu_performance: ``str`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| update = ET.Element("reconfigureServer", {"xmlns": TYPES_URN}) |
| update.set("id", node.id) |
| if memory_gb is not None: |
| ET.SubElement(update, "memoryGb").text = str(memory_gb) |
| if cpu_count is not None: |
| ET.SubElement(update, "cpuCount").text = str(cpu_count) |
| if cpu_performance is not None: |
| ET.SubElement(update, "cpuSpeed").text = cpu_performance |
| if cores_per_socket is not None: |
| ET.SubElement(update, "coresPerSocket").text = str(cores_per_socket) |
| result = self.connection.request_with_orgId_api_2( |
| "server/reconfigureServer", method="POST", data=ET.tostring(update) |
| ).object |
| response_code = findtext(result, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_clone_node_to_image( |
| self, |
| node, |
| image_name, |
| image_description=None, |
| cluster_id=None, |
| is_guest_Os_Customization=None, |
| tag_key_id=None, |
| tag_value=None, |
| ): |
| """ |
| Clone a server into a customer image. |
| |
| :param node: The server to clone |
| :type node: :class:`Node` |
| |
| :param image_name: The name of the clone image |
| :type image_name: ``str`` |
| |
| :param description: The description of the image |
| :type description: ``str`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| if image_description is None: |
| image_description = "" |
| |
| node_id = self._node_to_node_id(node) |
| """ |
| Removing anything below 2.4 |
| # Version 2.3 and lower |
| if LooseVersion(self.connection.active_api_version) < LooseVersion( |
| '2.4'): |
| response = self.connection.request_with_orgId_api_1( |
| 'server/%s?clone=%s&desc=%s' % |
| (node_id, image_name, image_description)).object |
| # Version 2.4 and higher |
| else: |
| """ |
| clone_server_elem = ET.Element("cloneServer", {"id": node_id, "xmlns": TYPES_URN}) |
| |
| ET.SubElement(clone_server_elem, "imageName").text = image_name |
| |
| if image_description is not None: |
| ET.SubElement(clone_server_elem, "description").text = image_description |
| |
| if cluster_id is not None: |
| ET.SubElement(clone_server_elem, "clusterId").text = cluster_id |
| |
| if is_guest_Os_Customization is not None: |
| ET.SubElement( |
| clone_server_elem, "guestOsCustomization" |
| ).text = is_guest_Os_Customization |
| |
| if tag_key_id is not None: |
| tag_elem = ET.SubElement(clone_server_elem, "tagById") |
| ET.SubElement(tag_elem, "tagKeyId").text = tag_key_id |
| |
| if tag_value is not None: |
| ET.SubElement(tag_elem, "value").text = tag_value |
| |
| response = self.connection.request_with_orgId_api_2( |
| "server/cloneServer", method="POST", data=ET.tostring(clone_server_elem) |
| ).object |
| """ |
| removing references to anything lower than 2.4 |
| # Version 2.3 and lower |
| if LooseVersion(self.connection.active_api_version) < LooseVersion( |
| '2.4'): |
| response_code = findtext(response, 'result', GENERAL_NS) |
| else: |
| """ |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| |
| return response_code in ["IN_PROGRESS", "SUCCESS"] |
| |
| def ex_clean_failed_deployment(self, node): |
| """ |
| Removes a node that has failed to deploy |
| |
| :param node: The failed node to clean |
| :type node: :class:`Node` or ``str`` |
| """ |
| |
| node_id = self._node_to_node_id(node) |
| request_elm = ET.Element("cleanServer", {"xmlns": TYPES_URN, "id": node_id}) |
| body = self.connection.request_with_orgId_api_2( |
| "server/cleanServer", method="POST", data=ET.tostring(request_elm) |
| ).object |
| response_code = findtext(body, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_list_customer_images(self, location=None): |
| """ |
| Return a list of customer imported images |
| |
| :param location: The target location |
| :type location: :class:`NodeLocation` or ``str`` |
| |
| :rtype: ``list`` of :class:`NodeImage` |
| """ |
| |
| params = {} |
| if location is not None: |
| params["datacenterId"] = self._location_to_location_id(location) |
| |
| return self._to_images( |
| self.connection.request_with_orgId_api_2("image/customerImage", params=params).object, |
| "customerImage", |
| ) |
| |
| def ex_get_base_image_by_id(self, id): |
| """ |
| Gets a Base image in the NTTC-CIS Cloud given the id |
| |
| :param id: The id of the image |
| :type id: ``str`` |
| |
| :rtype: :class:`NodeImage` |
| """ |
| |
| image = self.connection.request_with_orgId_api_2("image/osImage/%s" % id).object |
| return self._to_image(image) |
| |
| def ex_get_customer_image_by_id(self, id): |
| """ |
| Gets a Customer image in the NTTC-CIS Cloud given the id |
| |
| :param id: The id of the image |
| :type id: ``str`` |
| |
| :rtype: :class:`NodeImage` |
| """ |
| |
| image = self.connection.request_with_orgId_api_2("image/customerImage/%s" % id).object |
| return self._to_image(image) |
| |
| def ex_get_image_by_id(self, id): |
| """ |
| Gets a Base/Customer image in the NTTC-CIS Cloud given the id |
| |
| Note: This first checks the base image |
| If it is not a base image we check if it is a customer image |
| If it is not in either of these a NttCisAPIException |
| is thrown |
| |
| :param id: The id of the image |
| :type id: ``str`` |
| |
| :rtype: :class:`NodeImage` |
| """ |
| |
| try: |
| return self.ex_get_base_image_by_id(id) |
| except NttCisAPIException as e: |
| if e.code != "RESOURCE_NOT_FOUND": |
| raise e |
| return self.ex_get_customer_image_by_id(id) |
| |
| def ex_create_tag_key( |
| self, name, description=None, value_required=True, display_on_report=True |
| ): |
| """ |
| Creates a tag key in the NTTC-CIS Cloud |
| |
| :param name: The name of the tag key (required) |
| :type name: ``str`` |
| |
| :param description: The description of the tag key |
| :type description: ``str`` |
| |
| :param value_required: If a value is required for the tag |
| Tags themselves can be just a tag, |
| or be a key/value pair |
| :type value_required: ``bool`` |
| |
| :param display_on_report: Should this key show up on the usage reports |
| :type display_on_report: ``bool`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| create_tag_key = ET.Element("createTagKey", {"xmlns": TYPES_URN}) |
| ET.SubElement(create_tag_key, "name").text = name |
| if description is not None: |
| ET.SubElement(create_tag_key, "description").text = description |
| ET.SubElement(create_tag_key, "valueRequired").text = str(value_required).lower() |
| ET.SubElement(create_tag_key, "displayOnReport").text = str(display_on_report).lower() |
| response = self.connection.request_with_orgId_api_2( |
| "tag/createTagKey", method="POST", data=ET.tostring(create_tag_key) |
| ).object |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_list_tag_keys(self, id=None, name=None, value_required=None, display_on_report=None): |
| """ |
| List tag keys in the NTTC-CIS Cloud |
| |
| :param id: Filter the list to the id of the tag key |
| :type id: ``str`` |
| |
| :param name: Filter the list to the name of the tag key |
| :type name: ``str`` |
| |
| :param value_required: Filter the list to if a value is required |
| for a tag key |
| :type value_required: ``bool`` |
| |
| :param display_on_report: Filter the list to if the tag key should |
| show up on usage reports |
| :type display_on_report: ``bool`` |
| |
| :rtype: ``list`` of :class:`NttCisTagKey` |
| """ |
| |
| params = {} |
| if id is not None: |
| params["id"] = id |
| if name is not None: |
| params["name"] = name |
| if value_required is not None: |
| params["valueRequired"] = str(value_required).lower() |
| if display_on_report is not None: |
| params["displayOnReport"] = str(display_on_report).lower() |
| |
| paged_result = self.connection.paginated_request_with_orgId_api_2( |
| "tag/tagKey", method="GET", params=params |
| ) |
| |
| tag_keys = [] |
| for result in paged_result: |
| tag_keys.extend(self._to_tag_keys(result)) |
| return tag_keys |
| |
| def ex_get_tag_key_by_id(self, id): |
| """ |
| Get a specific tag key by ID |
| |
| :param id: ID of the tag key you want (required) |
| :type id: ``str`` |
| |
| :rtype: :class:`NttCisTagKey` |
| """ |
| |
| tag_key = self.connection.request_with_orgId_api_2("tag/tagKey/%s" % id).object |
| return self._to_tag_key(tag_key) |
| |
| def ex_get_tag_key_by_name(self, name): |
| """ |
| NOTICE: Tag key is one of those instances where Libloud |
| handles the search of a list for the client code. |
| This behavior exists inconsistently across libcloud. |
| Get a specific tag key by Name |
| |
| :param name: Name of the tag key you want (required) |
| :type name: ``str`` |
| |
| :rtype: :class:`NttCisTagKey` |
| """ |
| |
| tag_keys = self.ex_list_tag_keys(name=name) |
| if len(tag_keys) != 1: |
| raise ValueError("No tags found with name %s" % name) |
| return tag_keys[0] |
| |
| def ex_modify_tag_key( |
| self, |
| tag_key, |
| name=None, |
| description=None, |
| value_required=None, |
| display_on_report=None, |
| ): |
| |
| """ |
| Modify a specific tag key |
| |
| :param tag_key: The tag key you want to modify (required) |
| :type tag_key: :class:`NttCisTagKey` or ``str`` |
| |
| :param name: Set to modifiy the name of the tag key |
| :type name: ``str`` |
| |
| :param description: Set to modify the description of the tag key |
| :type description: ``str`` |
| |
| :param value_required: Set to modify if a value is required for |
| the tag key |
| :type value_required: ``bool`` |
| |
| :param display_on_report: Set to modify if this tag key should display |
| on the usage reports |
| :type display_on_report: ``bool`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| tag_key_id = self._tag_key_to_tag_key_id(tag_key) |
| modify_tag_key = ET.Element("editTagKey", {"xmlns": TYPES_URN, "id": tag_key_id}) |
| if name is not None: |
| ET.SubElement(modify_tag_key, "name").text = name |
| if description is not None: |
| ET.SubElement(modify_tag_key, "description").text = description |
| if value_required is not None: |
| ET.SubElement(modify_tag_key, "valueRequired").text = str(value_required).lower() |
| if display_on_report is not None: |
| ET.SubElement(modify_tag_key, "displayOnReport").text = str(display_on_report).lower() |
| |
| response = self.connection.request_with_orgId_api_2( |
| "tag/editTagKey", method="POST", data=ET.tostring(modify_tag_key) |
| ).object |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_remove_tag_key(self, tag_key): |
| """ |
| Modify a specific tag key |
| |
| :param tag_key: The tag key you want to remove (required) |
| :type tag_key: :class:`NttCisTagKey` or ``str`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| tag_key_id = self._tag_key_to_tag_key_id(tag_key) |
| remove_tag_key = ET.Element("deleteTagKey", {"xmlns": TYPES_URN, "id": tag_key_id}) |
| response = self.connection.request_with_orgId_api_2( |
| "tag/deleteTagKey", method="POST", data=ET.tostring(remove_tag_key) |
| ).object |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_apply_tag_to_asset(self, asset, tag_key, value=None): |
| """ |
| Apply a tag to a NTTC-CIS Asset |
| |
| :param asset: The asset to apply a tag to. (required) |
| :type asset: :class:`Node` or :class:`NodeImage` or |
| :class:`NttCisNewtorkDomain` or |
| :class:`NttCisVlan` or |
| :class:`NttCisPublicIpBlock` |
| |
| :param tag_key: The tag_key to apply to the asset. (required) |
| :type tag_key: :class:`NttCisTagKey` or ``str`` |
| |
| :param value: The value to be assigned to the tag key |
| This is only required if the :class:`NttCisTagKey` |
| requires it |
| :type value: ``str`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| asset_type = self._get_tagging_asset_type(asset) |
| tag_key_name = self._tag_key_to_tag_key_name(tag_key) |
| |
| apply_tags = ET.Element("applyTags", {"xmlns": TYPES_URN}) |
| ET.SubElement(apply_tags, "assetType").text = asset_type |
| ET.SubElement(apply_tags, "assetId").text = asset.id |
| |
| tag_ele = ET.SubElement(apply_tags, "tag") |
| ET.SubElement(tag_ele, "tagKeyName").text = tag_key_name |
| if value is not None: |
| ET.SubElement(tag_ele, "value").text = value |
| |
| response = self.connection.request_with_orgId_api_2( |
| "tag/applyTags", method="POST", data=ET.tostring(apply_tags) |
| ).object |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_remove_tag_from_asset(self, asset, tag_key): |
| """ |
| Remove a tag from an asset |
| |
| :param asset: The asset to remove a tag from. (required) |
| :type asset: :class:`Node` or :class:`NodeImage` or |
| :class:`NttCisNewtorkDomain` or |
| :class:`NttCisVlan` or |
| :class:`NttCisPublicIpBlock` |
| |
| :param tag_key: The tag key you want to remove (required) |
| :type tag_key: :class:`NttCisTagKey` or ``str`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| asset_type = self._get_tagging_asset_type(asset) |
| tag_key_name = self._tag_key_to_tag_key_name(tag_key) |
| |
| apply_tags = ET.Element("removeTags", {"xmlns": TYPES_URN}) |
| ET.SubElement(apply_tags, "assetType").text = asset_type |
| ET.SubElement(apply_tags, "assetId").text = asset.id |
| ET.SubElement(apply_tags, "tagKeyName").text = tag_key_name |
| response = self.connection.request_with_orgId_api_2( |
| "tag/removeTags", method="POST", data=ET.tostring(apply_tags) |
| ).object |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_list_tags( |
| self, |
| asset_id=None, |
| asset_type=None, |
| location=None, |
| tag_key_name=None, |
| tag_key_id=None, |
| value=None, |
| value_required=None, |
| display_on_report=None, |
| ): |
| """ |
| List tags in the NTTC-CIS Cloud |
| |
| :param asset_id: Filter the list by asset id |
| :type asset_id: ``str`` |
| |
| :param asset_type: Filter the list by asset type |
| :type asset_type: ``str`` |
| |
| :param location: Filter the list by the assets location |
| :type location: :class:``NodeLocation`` or ``str`` |
| |
| :param tag_key_name: Filter the list by a tag key name |
| :type tag_key_name: ``str`` |
| |
| :param tag_key_id: Filter the list by a tag key id |
| :type tag_key_id: ``str`` |
| |
| :param value: Filter the list by a tag value |
| :type value: ``str`` |
| |
| :param value_required: Filter the list to if a value is required |
| for a tag |
| :type value_required: ``bool`` |
| |
| :param display_on_report: Filter the list to if the tag should |
| show up on usage reports |
| :type display_on_report: ``bool`` |
| |
| :rtype: ``list`` of :class:`NttCisTag` |
| """ |
| |
| params = {} |
| if asset_id is not None: |
| params["assetId"] = asset_id |
| if asset_type is not None: |
| params["assetType"] = asset_type |
| if location is not None: |
| params["datacenterId"] = self._location_to_location_id(location) |
| if tag_key_name is not None: |
| params["tagKeyName"] = tag_key_name |
| if tag_key_id is not None: |
| params["tagKeyId"] = tag_key_id |
| if value is not None: |
| params["value"] = value |
| if value_required is not None: |
| params["valueRequired"] = str(value_required).lower() |
| if display_on_report is not None: |
| params["displayOnReport"] = str(display_on_report).lower() |
| |
| paged_result = self.connection.paginated_request_with_orgId_api_2( |
| "tag/tag", method="GET", params=params |
| ) |
| |
| tags = [] |
| for result in paged_result: |
| tags.extend(self._to_tags(result)) |
| return tags |
| |
| def ex_summary_usage_report(self, start_date, end_date): |
| """ |
| Get summary usage information |
| |
| :param start_date: Start date for the report |
| :type start_date: ``str`` in format YYYY-MM-DD |
| |
| :param end_date: End date for the report |
| :type end_date: ``str`` in format YYYY-MM-DD |
| |
| :rtype: ``list`` of ``list`` |
| """ |
| |
| result = self.connection.raw_request_with_orgId_api_1( |
| "report/usage?startDate={}&endDate={}".format(start_date, end_date) |
| ) |
| return self._format_csv(result.response) |
| |
| def ex_detailed_usage_report(self, start_date, end_date): |
| """ |
| Get detailed usage information |
| |
| :param start_date: Start date for the report |
| :type start_date: ``str`` in format YYYY-MM-DD |
| |
| :param end_date: End date for the report |
| :type end_date: ``str`` in format YYYY-MM-DD |
| |
| :rtype: ``list`` of ``list`` |
| """ |
| |
| result = self.connection.raw_request_with_orgId_api_1( |
| "report/usageDetailed?startDate={}&endDate={}".format(start_date, end_date) |
| ) |
| return self._format_csv(result.response) |
| |
| def ex_software_usage_report(self, start_date, end_date): |
| """ |
| Get detailed software usage reports |
| |
| :param start_date: Start date for the report |
| :type start_date: ``str`` in format YYYY-MM-DD |
| |
| :param end_date: End date for the report |
| :type end_date: ``str`` in format YYYY-MM-DD |
| |
| :rtype: ``list`` of ``list`` |
| """ |
| |
| result = self.connection.raw_request_with_orgId_api_1( |
| "report/usageSoftwareUnits?startDate={}&endDate={}".format(start_date, end_date) |
| ) |
| return self._format_csv(result.response) |
| |
| def ex_audit_log_report(self, start_date, end_date): |
| """ |
| Get audit log report |
| |
| :param start_date: Start date for the report |
| :type start_date: ``str`` in format YYYY-MM-DD |
| |
| :param end_date: End date for the report |
| :type end_date: ``str`` in format YYYY-MM-DD |
| |
| :rtype: ``list`` of ``list`` |
| """ |
| |
| result = self.connection.raw_request_with_orgId_api_1( |
| "auditlog?startDate={}&endDate={}".format(start_date, end_date) |
| ) |
| return self._format_csv(result.response) |
| |
| def ex_backup_usage_report(self, start_date, end_date, location): |
| """ |
| Get audit log report |
| |
| :param start_date: Start date for the report |
| :type start_date: ``str`` in format YYYY-MM-DD |
| |
| :param end_date: End date for the report |
| :type end_date: ``str`` in format YYYY-MM-DD |
| |
| :keyword location: Filters the node list to nodes that are |
| located in this location |
| :type location: :class:`NodeLocation` or ``str`` |
| |
| :rtype: ``list`` of ``list`` |
| """ |
| |
| datacenter_id = self._location_to_location_id(location) |
| result = self.connection.raw_request_with_orgId_api_1( |
| "backup/detailedUsageReport?datacenterId=%s&fromDate=%s&toDate=%s" |
| % (datacenter_id, start_date, end_date) |
| ) |
| return self._format_csv(result.response) |
| |
| def ex_list_ip_address_list(self, ex_network_domain): |
| """ |
| List IP Address List by network domain ID specified |
| |
| >>> from pprint import pprint |
| >>> from libcloud.compute.types import Provider |
| >>> from libcloud.compute.providers import get_driver |
| >>> import libcloud.security |
| >>> |
| >>> # Get NTTC-CIS driver |
| >>> libcloud.security.VERIFY_SSL_CERT = True |
| >>> cls = get_driver(Provider.NTTCIS) |
| >>> driver = cls('myusername','mypassword', region='dd-au') |
| >>> |
| >>> # Get location |
| >>> location = driver.ex_get_location_by_id(id='AU9') |
| >>> |
| >>> # Get network domain by location |
| >>> networkDomainName = "Baas QA" |
| >>> network_domains = driver.ex_list_network_domains(location=location) |
| >>> my_network_domain = [d for d in network_domains if d.name == |
| networkDomainName][0] |
| >>> |
| >>> # List IP Address List of network domain |
| >>> ipaddresslist_list = driver.ex_list_ip_address_list( |
| >>> ex_network_domain=my_network_domain) |
| >>> pprint(ipaddresslist_list) |
| |
| :param ex_network_domain: The network domain or network domain ID |
| :type ex_network_domain: :class:`NttCisNetworkDomain` or 'str' |
| |
| :return: a list of NttCisIpAddressList objects |
| :rtype: ``list`` of :class:`NttCisIpAddressList` |
| """ |
| |
| params = {"networkDomainId": self._network_domain_to_network_domain_id(ex_network_domain)} |
| response = self.connection.request_with_orgId_api_2( |
| "network/ipAddressList", params=params |
| ).object |
| return self._to_ip_address_lists(response) |
| |
| def ex_get_ip_address_list(self, ex_network_domain, ex_ip_address_list_name): |
| """ |
| Get IP Address List by name in network domain specified |
| |
| >>> from pprint import pprint |
| >>> from libcloud.compute.types import Provider |
| >>> from libcloud.compute.providers import get_driver |
| >>> import libcloud.security |
| >>> |
| >>> # Get NTTC-CIS driver |
| >>> libcloud.security.VERIFY_SSL_CERT = True |
| >>> cls = get_driver(Provider.NTTCIS) |
| >>> driver = cls('myusername','mypassword', region='dd-au') |
| >>> |
| >>> # Get location |
| >>> location = driver.ex_get_location_by_id(id='AU9') |
| >>> |
| >>> # Get network domain by location |
| >>> networkDomainName = "Baas QA" |
| >>> network_domains = driver.ex_list_network_domains(location=location) |
| >>> my_network_domain = [d for d in network_domains if d.name == |
| networkDomainName][0] |
| >>> |
| >>> # Get IP Address List by Name |
| >>> ipaddresslist_list_by_name = driver.ex_get_ip_address_list( |
| >>> ex_network_domain=my_network_domain, |
| >>> ex_ip_address_list_name='My_IP_AddressList_1') |
| >>> pprint(ipaddresslist_list_by_name) |
| |
| |
| :param ex_network_domain: (required) The network domain or network |
| domain ID in which ipaddresslist resides. |
| :type ex_network_domain: :class:`NttCisNetworkDomain` or 'str' |
| |
| :param ex_ip_address_list_name: (required) Get 'IP Address List' by |
| name |
| :type ex_ip_address_list_name: :``str`` |
| |
| :return: a list of NttCisIpAddressList objects |
| :rtype: ``list`` of :class:`NttCisIpAddressList` |
| """ |
| |
| ip_address_lists = self.ex_list_ip_address_list(ex_network_domain) |
| return list(filter(lambda x: x.name == ex_ip_address_list_name, ip_address_lists)) |
| |
| def ex_create_ip_address_list( |
| self, |
| ex_network_domain, |
| name, |
| description, |
| ip_version, |
| ip_address_collection, |
| child_ip_address_list=None, |
| ): |
| """ |
| Create IP Address List. IP Address list. |
| |
| >>> from pprint import pprint |
| >>> from libcloud.compute.types import Provider |
| >>> from libcloud.compute.providers import get_driver |
| >>> from libcloud.common.nttcis import NttCisIpAddress |
| >>> import libcloud.security |
| >>> |
| >>> # Get NTTC-CIS driver |
| >>> libcloud.security.VERIFY_SSL_CERT = True |
| >>> cls = get_driver(Provider.NTTCIS) |
| >>> driver = cls('myusername','mypassword', region='dd-au') |
| >>> |
| >>> # Get location |
| >>> location = driver.ex_get_location_by_id(id='AU9') |
| >>> |
| >>> # Get network domain by location |
| >>> networkDomainName = "Baas QA" |
| >>> network_domains = driver.ex_list_network_domains(location=location) |
| >>> my_network_domain = [d for d in network_domains if d.name == |
| networkDomainName][0] |
| >>> |
| >>> # IP Address collection |
| >>> ipAddress_1 = NttCisIpAddress(begin='190.2.2.100') |
| >>> ipAddress_2 = NttCisIpAddress(begin='190.2.2.106', |
| end='190.2.2.108') |
| >>> ipAddress_3 = NttCisIpAddress(begin='190.2.2.0', |
| prefix_size='24') |
| >>> ip_address_collection = [ipAddress_1, ipAddress_2, ipAddress_3] |
| >>> |
| >>> # Create IPAddressList |
| >>> result = driver.ex_create_ip_address_list( |
| >>> ex_network_domain=my_network_domain, |
| >>> name='My_IP_AddressList_2', |
| >>> ip_version='IPV4', |
| >>> description='Test only', |
| >>> ip_address_collection=ip_address_collection, |
| >>> child_ip_address_list='08468e26-eeb3-4c3d-8ff2-5351fa6d8a04' |
| >>> ) |
| >>> |
| >>> pprint(result) |
| |
| |
| :param ex_network_domain: The network domain or network domain ID |
| :type ex_network_domain: :class:`NttCisNetworkDomain` or 'str' |
| |
| :param name: IP Address List Name (required) |
| :type name: :``str`` |
| |
| :param description: IP Address List Description (optional) |
| :type description: :``str`` |
| |
| :param ip_version: IP Version of ip address (required) |
| :type ip_version: :``str`` |
| |
| :param ip_address_collection: List of IP Address. At least one |
| ipAddress element or one |
| childIpAddressListId element must |
| be provided. |
| :type ip_address_collection: :``str`` |
| |
| :param child_ip_address_list: Child IP Address List or id to be |
| included in this IP Address List. |
| At least one ipAddress or |
| one childIpAddressListId |
| must be provided. |
| :type child_ip_address_list: |
| :class:'NttCisChildIpAddressList` or `str`` |
| |
| :return: a list of NttCisIpAddressList objects |
| :rtype: ``list`` of :class:`NttCisIpAddressList` |
| """ |
| |
| if ip_address_collection is None and child_ip_address_list is None: |
| raise ValueError( |
| "At least one ipAddress element or one " |
| "childIpAddressListId element must be " |
| "provided." |
| ) |
| |
| create_ip_address_list = ET.Element("createIpAddressList", {"xmlns": TYPES_URN}) |
| ET.SubElement( |
| create_ip_address_list, "networkDomainId" |
| ).text = self._network_domain_to_network_domain_id(ex_network_domain) |
| |
| ET.SubElement(create_ip_address_list, "name").text = name |
| |
| ET.SubElement(create_ip_address_list, "description").text = description |
| |
| ET.SubElement(create_ip_address_list, "ipVersion").text = ip_version |
| |
| for ip in ip_address_collection: |
| ip_address = ET.SubElement( |
| create_ip_address_list, |
| "ipAddress", |
| ) |
| ip_address.set("begin", ip.begin) |
| |
| if ip.end: |
| ip_address.set("end", ip.end) |
| |
| if ip.prefix_size: |
| ip_address.set("prefixSize", ip.prefix_size) |
| |
| if child_ip_address_list is not None: |
| ET.SubElement( |
| create_ip_address_list, "childIpAddressListId" |
| ).text = self._child_ip_address_list_to_child_ip_address_list_id(child_ip_address_list) |
| |
| response = self.connection.request_with_orgId_api_2( |
| "network/createIpAddressList", |
| method="POST", |
| data=ET.tostring(create_ip_address_list), |
| ).object |
| |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_edit_ip_address_list( |
| self, |
| ex_ip_address_list, |
| description=None, |
| ip_address_collection=None, |
| child_ip_address_lists=None, |
| ): |
| """ |
| Edit IP Address List. IP Address list. |
| Bear in mind you cannot add ip addresses to |
| >>> from pprint import pprint |
| >>> from libcloud.compute.types import Provider |
| >>> from libcloud.compute.providers import get_driver |
| >>> from libcloud.common.NTTCIS import NttCisIpAddress |
| >>> import libcloud.security |
| >>> |
| >>> # Get NTTC-CIS driver |
| >>> libcloud.security.VERIFY_SSL_CERT = True |
| >>> cls = get_driver(Provider.NTTCIS) |
| >>> driver = cls('myusername','mypassword', region='dd-au') |
| >>> |
| >>> # IP Address collection |
| >>> ipAddress_1 = NttCisIpAddress(begin='190.2.2.100') |
| >>> ipAddress_2 = NttCisIpAddress(begin='190.2.2.106', |
| >>> end='190.2.2.108') |
| >>> ipAddress_3 = NttCisIpAddress( |
| >>> begin='190.2.2.0', prefix_size='24') |
| >>> ip_address_collection = [ipAddress_1, ipAddress_2, ipAddress_3] |
| >>> |
| >>> # Edit IP Address List |
| >>> ip_address_list_id = '5e7c323f-c885-4e4b-9a27-94c44217dbd3' |
| >>> result = driver.ex_edit_ip_address_list( |
| >>> ex_ip_address_list=ip_address_list_id, |
| >>> description="Edit Test", |
| >>> ip_address_collection=ip_address_collection, |
| >>> child_ip_address_lists=None |
| >>> ) |
| >>> pprint(result) |
| |
| :param ex_ip_address_list: (required) IpAddressList object or |
| IpAddressList ID |
| :type ex_ip_address_list: :class:'NttCisIpAddressList' |
| or ``str`` |
| |
| :param description: IP Address List Description |
| :type description: :``str`` |
| |
| :param ip_address_collection: List of IP Address |
| :type ip_address_collection: ''list'' of |
| :class:'NttCisIpAddressList' |
| |
| :param child_ip_address_lists: Child IP Address List or id to be |
| included in this IP Address List |
| :type child_ip_address_lists: ``list`` of |
| :class:'NttCisChildIpAddressList' |
| or ``str`` |
| |
| :return: a list of NttCisIpAddressList objects |
| :rtype: ``list`` of :class:`NttCisIpAddressList` |
| """ |
| |
| edit_ip_address_list = ET.Element( |
| "editIpAddressList", |
| { |
| "xmlns": TYPES_URN, |
| "id": self._ip_address_list_to_ip_address_list_id(ex_ip_address_list), |
| "xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", |
| }, |
| ) |
| if description is not None: |
| if description != "nil": |
| ET.SubElement(edit_ip_address_list, "description").text = description |
| else: |
| ET.SubElement(edit_ip_address_list, "description", {"xsi:nil": "true"}) |
| |
| if ip_address_collection is not None: |
| for ip in ip_address_collection: |
| ip_address = ET.SubElement( |
| edit_ip_address_list, |
| "ipAddress", |
| ) |
| ip_address.set("begin", ip.begin) |
| |
| if ip.end: |
| ip_address.set("end", ip.end) |
| |
| if ip.prefix_size: |
| ip_address.set("prefixSize", ip.prefix_size) |
| |
| if child_ip_address_lists is not None: |
| ET.SubElement( |
| edit_ip_address_list, "childIpAddressListId" |
| ).text = self._child_ip_address_list_to_child_ip_address_list_id(child_ip_address_lists) |
| else: |
| ET.SubElement(edit_ip_address_list, "childIpAddressListId", {"xsi:nil": "true"}) |
| |
| response = self.connection.request_with_orgId_api_2( |
| "network/editIpAddressList", |
| method="POST", |
| data=ET.tostring(edit_ip_address_list), |
| ).object |
| |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_delete_ip_address_list(self, ex_ip_address_list): |
| """ |
| Delete IP Address List by ID |
| |
| >>> from pprint import pprint |
| >>> from libcloud.compute.types import Provider |
| >>> from libcloud.compute.providers import get_driver |
| >>> import libcloud.security |
| >>> |
| >>> # Get NTTC-CIS driver |
| >>> libcloud.security.VERIFY_SSL_CERT = True |
| >>> cls = get_driver(Provider.NTTCIS) |
| >>> driver = cls('myusername','mypassword', region='dd-au') |
| >>> |
| >>> ip_address_list_id = '5e7c323f-c885-4e4b-9a27-94c44217dbd3' |
| >>> result = driver.ex_delete_ip_address_list(ip_address_list_id) |
| >>> pprint(result) |
| |
| :param ex_ip_address_list: IP Address List object or IP Address |
| List ID (required) |
| :type ex_ip_address_list: :class:'NttCisIpAddressList' |
| or ``str`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| delete_ip_address_list = ET.Element( |
| "deleteIpAddressList", |
| { |
| "xmlns": TYPES_URN, |
| "id": self._ip_address_list_to_ip_address_list_id(ex_ip_address_list), |
| }, |
| ) |
| |
| response = self.connection.request_with_orgId_api_2( |
| "network/deleteIpAddressList", |
| method="POST", |
| data=ET.tostring(delete_ip_address_list), |
| ).object |
| |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_list_portlist(self, ex_network_domain): |
| """ |
| List Portlist by network domain ID specified |
| |
| >>> from pprint import pprint |
| >>> from libcloud.compute.types import Provider |
| >>> from libcloud.compute.providers import get_driver |
| >>> import libcloud.security |
| >>> |
| >>> # Get NTTC-CIS driver |
| >>> libcloud.security.VERIFY_SSL_CERT = True |
| >>> cls = get_driver(Provider.NTTCIS) |
| >>> driver = cls('myusername','mypassword', region='dd-au') |
| >>> |
| >>> # Get location |
| >>> location = driver.ex_get_location_by_id(id='AU9') |
| >>> |
| >>> # Get network domain by location |
| >>> networkDomainName = "Baas QA" |
| >>> network_domains = driver.ex_list_network_domains(location=location) |
| >>> my_network_domain = [d for d in network_domains if d.name == |
| >>> networkDomainName][0] |
| >>> |
| >>> # List portlist |
| >>> portLists = driver.ex_list_portlist( |
| >>> ex_network_domain=my_network_domain) |
| >>> pprint(portLists) |
| >>> |
| |
| :param ex_network_domain: The network domain or network domain ID |
| :type ex_network_domain: :class:`NttCisNetworkDomain` or 'str' |
| |
| :return: a list of NttCisPortList objects |
| :rtype: ``list`` of :class:`NttCisPortList` |
| """ |
| |
| params = {"networkDomainId": self._network_domain_to_network_domain_id(ex_network_domain)} |
| response = self.connection.request_with_orgId_api_2( |
| "network/portList", params=params |
| ).object |
| return self._to_port_lists(response) |
| |
| def ex_get_portlist(self, ex_portlist_id): |
| """ |
| Get Port List |
| |
| >>> from pprint import pprint |
| >>> from libcloud.compute.types import Provider |
| >>> from libcloud.compute.providers import get_driver |
| >>> import libcloud.security |
| >>> |
| >>> # Get NTTC-CIS driver |
| >>> libcloud.security.VERIFY_SSL_CERT = True |
| >>> cls = get_driver(Provider.NTTCIS) |
| >>> driver = cls('myusername','mypassword', region='dd-au') |
| >>> |
| >>> # Get specific portlist by ID |
| >>> portlist_id = '27dd8c66-80ff-496b-9f54-2a3da2fe679e' |
| >>> portlist = driver.ex_get_portlist(portlist_id) |
| >>> pprint(portlist) |
| |
| :param ex_portlist_id: The ex_port_list or ex_port_list ID |
| :type ex_portlist_id: :class:`NttCisNetworkDomain` or 'str' |
| |
| :return: NttCisPortList object |
| :rtype: :class:`NttCisPort` |
| """ |
| |
| url_path = "network/portList/%s" % ex_portlist_id |
| response = self.connection.request_with_orgId_api_2(url_path).object |
| return self._to_port_list(response) |
| |
| def ex_create_portlist( |
| self, |
| ex_network_domain, |
| name, |
| description, |
| port_collection, |
| child_portlist_list=None, |
| ): |
| """ |
| Create Port List. |
| |
| >>> from pprint import pprint |
| >>> from libcloud.compute.types import Provider |
| >>> from libcloud.compute.providers import get_driver |
| >>> from libcloud.common.nttcis import NttCisPort |
| >>> import libcloud.security |
| >>> |
| >>> # Get NTTC-CIS driver |
| >>> libcloud.security.VERIFY_SSL_CERT = True |
| >>> cls = get_driver(Provider.NTTCIS) |
| >>> driver = cls('myusername','mypassword', region='dd-au') |
| >>> |
| >>> # Get location |
| >>> location = driver.ex_get_location_by_id(id='AU9') |
| >>> |
| >>> # Get network domain by location |
| >>> networkDomainName = "Baas QA" |
| >>> network_domains = driver.ex_list_network_domains(location=location) |
| >>> my_network_domain = [d for d in network_domains if d.name == |
| networkDomainName][0] |
| >>> |
| >>> # Port Collection |
| >>> port_1 = DimensionDataPort(begin='1000') |
| >>> port_2 = DimensionDataPort(begin='1001', end='1003') |
| >>> port_collection = [port_1, port_2] |
| >>> |
| >>> # Create Port List |
| >>> new_portlist = driver.ex_create_portlist( |
| >>> ex_network_domain=my_network_domain, |
| >>> name='MyPortListX', |
| >>> description="Test only", |
| >>> port_collection=port_collection, |
| >>> child_portlist_list={'a9cd4984-6ff5-4f93-89ff-8618ab642bb9'} |
| >>> ) |
| >>> pprint(new_portlist) |
| |
| :param ex_network_domain: (required) The network domain in |
| which to create PortList. Provide |
| networkdomain object or its id. |
| :type ex_network_domain: :``str`` |
| |
| :param name: Port List Name |
| :type name: :``str`` |
| |
| :param description: IP Address List Description |
| :type description: :``str`` |
| |
| :param port_collection: List of Port Address |
| :type port_collection: :``str`` |
| |
| :param child_portlist_list: List of Child Portlist to be |
| included in this Port List |
| :type child_portlist_list: :``str`` or ''list of |
| :class:'NttCisChildPortList' |
| |
| :return: result of operation |
| :rtype: ``bool`` |
| """ |
| |
| new_port_list = ET.Element("createPortList", {"xmlns": TYPES_URN}) |
| ET.SubElement( |
| new_port_list, "networkDomainId" |
| ).text = self._network_domain_to_network_domain_id(ex_network_domain) |
| |
| ET.SubElement(new_port_list, "name").text = name |
| |
| ET.SubElement(new_port_list, "description").text = description |
| |
| for port in port_collection: |
| p = ET.SubElement(new_port_list, "port") |
| p.set("begin", port.begin) |
| |
| if port.end: |
| p.set("end", port.end) |
| |
| if child_portlist_list is not None: |
| for child in child_portlist_list: |
| ET.SubElement( |
| new_port_list, "childPortListId" |
| ).text = self._child_port_list_to_child_port_list_id(child) |
| |
| response = self.connection.request_with_orgId_api_2( |
| "network/createPortList", method="POST", data=ET.tostring(new_port_list) |
| ).object |
| |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_edit_portlist( |
| self, |
| ex_portlist, |
| description=None, |
| port_collection=None, |
| child_portlist_list=None, |
| ): |
| """ |
| Edit Port List. |
| |
| >>> from pprint import pprint |
| >>> from libcloud.compute.types import Provider |
| >>> from libcloud.compute.providers import get_driver |
| >>> from libcloud.common.NTTCIS import DimensionDataPort |
| >>> import libcloud.security |
| >>> |
| >>> # Get NTTC-CIS driver |
| >>> libcloud.security.VERIFY_SSL_CERT = True |
| >>> cls = get_driver(Provider.NTTCIS) |
| >>> driver = cls('myusername','mypassword', region='dd-au') |
| >>> |
| >>> # Port Collection |
| >>> port_1 = DimensionDataPort(begin='4200') |
| >>> port_2 = DimensionDataPort(begin='4201', end='4210') |
| >>> port_collection = [port_1, port_2] |
| >>> |
| >>> # Edit Port List |
| >>> editPortlist = driver.ex_get_portlist( |
| '27dd8c66-80ff-496b-9f54-2a3da2fe679e') |
| >>> |
| >>> result = driver.ex_edit_portlist( |
| >>> ex_portlist=editPortlist.id, |
| >>> description="Make Changes in portlist", |
| >>> port_collection=port_collection, |
| >>> child_portlist_list={'a9cd4984-6ff5-4f93-89ff-8618ab642bb9'} |
| >>> ) |
| >>> pprint(result) |
| |
| :param ex_portlist: Port List to be edited |
| (required) |
| :type ex_portlist: :``str`` or :class:'DNttCisPortList' |
| |
| :param description: Port List Description |
| :type description: :``str`` |
| |
| :param port_collection: List of Ports |
| :type port_collection: :``str`` |
| |
| :param child_portlist_list: Child PortList to be included in |
| this IP Address List |
| :type child_portlist_list: :``list`` of |
| :class'NttCisChildPortList' |
| or ''str'' |
| |
| :return: a list of NttCisPortList objects |
| :rtype: ``list`` of :class:`NttCisPortList` |
| """ |
| |
| existing_port_address_list = ET.Element( |
| "editPortList", |
| { |
| "id": self._port_list_to_port_list_id(ex_portlist), |
| "xmlns": TYPES_URN, |
| "xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", |
| }, |
| ) |
| if description is not None: |
| if description != "nil": |
| ET.SubElement(existing_port_address_list, "description").text = description |
| else: |
| ET.SubElement(existing_port_address_list, "description", {"xsi:nil": "true"}) |
| |
| if port_collection is not None: |
| for port in port_collection: |
| p = ET.SubElement(existing_port_address_list, "port") |
| p.set("begin", port.begin) |
| |
| if port.end: |
| p.set("end", port.end) |
| |
| if child_portlist_list is not None: |
| for child in child_portlist_list: |
| ET.SubElement( |
| existing_port_address_list, "childPortListId" |
| ).text = self._child_port_list_to_child_port_list_id(child) |
| else: |
| ET.SubElement(existing_port_address_list, "childPortListId", {"xsi:nil": "true"}) |
| |
| response = self.connection.request_with_orgId_api_2( |
| "network/editPortList", |
| method="POST", |
| data=ET.tostring(existing_port_address_list), |
| ).object |
| |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_delete_portlist(self, ex_portlist): |
| """ |
| Delete Port List |
| |
| >>> from pprint import pprint |
| >>> from libcloud.compute.types import Provider |
| >>> from libcloud.compute.providers import get_driver |
| >>> import libcloud.security |
| >>> |
| >>> # Get NTTC-CIS driver |
| >>> libcloud.security.VERIFY_SSL_CERT = True |
| >>> cls = get_driver(Provider.NTTCIS) |
| >>> driver = cls('myusername','mypassword', region='dd-au') |
| >>> |
| >>> # Delete Port List |
| >>> portlist_id = '157531ce-77d4-493c-866b-d3d3fc4a912a' |
| >>> response = driver.ex_delete_portlist(portlist_id) |
| >>> pprint(response) |
| |
| :param ex_portlist: Port List to be deleted |
| :type ex_portlist: :``str`` or :class:'NttCisPortList' |
| |
| :rtype: ``bool`` |
| """ |
| |
| delete_port_list = ET.Element( |
| "deletePortList", |
| {"xmlns": TYPES_URN, "id": self._port_list_to_port_list_id(ex_portlist)}, |
| ) |
| |
| response = self.connection.request_with_orgId_api_2( |
| "network/deletePortList", method="POST", data=ET.tostring(delete_port_list) |
| ).object |
| |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_exchange_nic_vlans(self, nic_id_1, nic_id_2): |
| """ |
| Exchange NIC Vlans |
| |
| :param nic_id_1: Nic ID 1 |
| :type nic_id_1: :``str`` |
| |
| :param nic_id_2: Nic ID 2 |
| :type nic_id_2: :``str`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| exchange_elem = ET.Element("urn:exchangeNicVlans", {"xmlns:urn": TYPES_URN}) |
| |
| ET.SubElement(exchange_elem, "urn:nicId1").text = nic_id_1 |
| ET.SubElement(exchange_elem, "urn:nicId2").text = nic_id_2 |
| |
| response = self.connection.request_with_orgId_api_2( |
| "server/exchangeNicVlans", method="POST", data=ET.tostring(exchange_elem) |
| ).object |
| |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_change_nic_network_adapter(self, nic_id, network_adapter_name): |
| """ |
| Change network adapter of a NIC on a cloud server |
| |
| :param nic_id: Nic ID |
| :type nic_id: :``str`` |
| |
| :param network_adapter_name: Network adapter name |
| :type network_adapter_name: :``str`` |
| |
| :rtype: ``bool`` |
| """ |
| |
| change_elem = ET.Element("changeNetworkAdapter", {"nicId": nic_id, "xmlns": TYPES_URN}) |
| |
| ET.SubElement(change_elem, "networkAdapter").text = network_adapter_name |
| |
| response = self.connection.request_with_orgId_api_2( |
| "server/changeNetworkAdapter", method="POST", data=ET.tostring(change_elem) |
| ).object |
| |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_create_node_uncustomized( |
| self, |
| name, |
| image, |
| ex_network_domain, |
| ex_is_started=True, |
| ex_description=None, |
| ex_cluster_id=None, |
| ex_cpu_specification=None, |
| ex_memory_gb=None, |
| ex_primary_nic_private_ipv4=None, |
| ex_primary_nic_vlan=None, |
| ex_primary_nic_network_adapter=None, |
| ex_additional_nics=None, |
| ex_disks=None, |
| ex_tagid_value_pairs=None, |
| ex_tagname_value_pairs=None, |
| ): |
| |
| """ |
| This MCP 2.0 only function deploys a new Cloud Server from a |
| CloudControl compatible Server Image, which does not utilize |
| VMware Guest OS Customization process. |
| |
| Create Node in MCP2 Data Center |
| |
| :keyword name: (required) String with a name for this new node |
| :type name: ``str`` |
| |
| :keyword image: (UUID of the Server Image being used as the target |
| for the new Server deployment. The source Server |
| Image (OS Image or Customer Image) must have |
| osCustomization set to true. See Get/List OS |
| Image(s) and Get/List Customer Image(s). |
| :type image: :class:`NodeImage` or ``str`` |
| |
| |
| :keyword ex_network_domain: (required) Network Domain or Network |
| Domain ID to create the node |
| :type ex_network_domain: :class:`NttCisNetworkDomain` |
| or ``str`` |
| |
| :keyword ex_description: (optional) description for this node |
| :type ex_description: ``str`` |
| |
| :keyword ex_cluster_id: (optional) For multiple cluster |
| environments, it is possible to set a destination cluster for the new |
| Customer Image. Note that performance of this function is optimal when |
| either the Server cluster and destination are the same or when shared |
| data storage is in place for the multiple clusters. |
| :type ex_cluster_id: ``str`` |
| |
| |
| :keyword ex_primary_nic_private_ipv4: Provide private IPv4. Ignore |
| if ex_primary_nic_vlan is |
| provided. Use one or the |
| other. Not both. |
| :type ex_primary_nic_private_ipv4: :``str`` |
| |
| :keyword ex_primary_nic_vlan: Provide VLAN for the node if |
| ex_primary_nic_private_ipv4 NOT |
| provided. One or the other. Not both. |
| :type ex_primary_nic_vlan: :class: NttCisVlan or ``str`` |
| |
| :keyword ex_primary_nic_network_adapter: (Optional) Default value |
| for the Operating System |
| will be used if leave |
| empty. Example: "E1000". |
| :type ex_primary_nic_network_adapter: :``str`` |
| |
| :keyword ex_additional_nics: (optional) List |
| :class:'NttCisNic' or None |
| :type ex_additional_nics: ``list`` of :class:'NttCisNic' |
| or ``str`` |
| |
| :keyword ex_memory_gb: (optional) The amount of memory in GB for |
| the server Can be used to override the |
| memory value inherited from the source |
| Server Image. |
| :type ex_memory_gb: ``int`` |
| |
| :keyword ex_cpu_specification: (optional) The spec of CPU to deploy |
| :type ex_cpu_specification: |
| :class:`NttCisServerCpuSpecification` |
| |
| :keyword ex_is_started: (required) Start server after creation. |
| Default is set to true. |
| :type ex_is_started: ``bool`` |
| |
| :keyword ex_disks: (optional) NttCis disks. Optional disk |
| elements can be used to define the disk speed |
| that each disk on the Server; inherited from the |
| source Server Image will be deployed to. It is |
| not necessary to include a diskelement for every |
| disk; only those that you wish to set a disk |
| speed value for. Note that scsiId 7 cannot be |
| used.Up to 13 disks can be present in addition to |
| the required OS disk on SCSI ID 0. Refer to |
| https://docs.mcp-services.net/x/UwIu for disk |
| |
| :type ex_disks: List or tuple of :class:'NttCisServerDisk` |
| |
| :keyword ex_tagid_value_pairs: |
| (Optional) up to 10 tag elements may be provided. |
| A combination of tagById and tag name cannot be |
| supplied in the same request. |
| Note: ex_tagid_value_pairs and |
| ex_tagname_value_pairs is |
| mutually exclusive. Use one or other. |
| |
| :type ex_tagname_value_pairs: ``dict``. Value can be None. |
| |
| :keyword ex_tagname_value_pairs: |
| (Optional) up to 10 tag elements may be provided. |
| A combination of tagById and tag name cannot be |
| supplied in the same request. |
| Note: ex_tagid_value_pairs and |
| ex_tagname_value_pairs is |
| mutually exclusive. Use one or other. |
| |
| :type ex_tagname_value_pairs: ``dict```. |
| |
| :return: The newly created :class:`Node`. |
| :rtype: :class:`Node` |
| """ |
| |
| # Unsupported for version lower than 2.4 |
| if LooseVersion(self.connection.active_api_version) < LooseVersion("2.4"): |
| raise Exception("This feature is NOT supported in " "earlier api version of 2.4") |
| |
| # Default start to true if input is invalid |
| if not isinstance(ex_is_started, bool): |
| ex_is_started = True |
| print("Warning: ex_is_started input value is invalid. Default" "to True") |
| |
| server_uncustomized_elm = ET.Element("deployUncustomizedServer", {"xmlns": TYPES_URN}) |
| ET.SubElement(server_uncustomized_elm, "name").text = name |
| ET.SubElement(server_uncustomized_elm, "description").text = ex_description |
| image_id = self._image_to_image_id(image) |
| ET.SubElement(server_uncustomized_elm, "imageId").text = image_id |
| |
| if ex_cluster_id: |
| dns_elm = ET.SubElement(server_uncustomized_elm, "primaryDns") |
| dns_elm.text = ex_cluster_id |
| |
| if ex_is_started is not None: |
| ET.SubElement(server_uncustomized_elm, "start").text = str(ex_is_started).lower() |
| |
| if ex_cpu_specification is not None: |
| cpu = ET.SubElement(server_uncustomized_elm, "cpu") |
| cpu.set("speed", ex_cpu_specification.performance) |
| cpu.set("count", str(ex_cpu_specification.cpu_count)) |
| cpu.set("coresPerSocket", str(ex_cpu_specification.cores_per_socket)) |
| |
| if ex_memory_gb is not None: |
| ET.SubElement(server_uncustomized_elm, "memoryGb").text = str(ex_memory_gb) |
| |
| if ex_primary_nic_private_ipv4 is None and ex_primary_nic_vlan is None: |
| raise ValueError( |
| "Missing argument. Either " |
| "ex_primary_nic_private_ipv4 or " |
| "ex_primary_nic_vlan " |
| "must be specified." |
| ) |
| |
| if ex_primary_nic_private_ipv4 is not None and ex_primary_nic_vlan is not None: |
| raise ValueError( |
| "Either ex_primary_nic_private_ipv4 or " |
| "ex_primary_nic_vlan " |
| "be specified. Not both." |
| ) |
| |
| network_elm = ET.SubElement(server_uncustomized_elm, "networkInfo") |
| |
| net_domain_id = self._network_domain_to_network_domain_id(ex_network_domain) |
| network_elm.set("networkDomainId", net_domain_id) |
| |
| pri_nic = ET.SubElement(network_elm, "primaryNic") |
| |
| if ex_primary_nic_private_ipv4 is not None: |
| ET.SubElement(pri_nic, "privateIpv4").text = ex_primary_nic_private_ipv4 |
| |
| if ex_primary_nic_vlan is not None: |
| vlan_id = self._vlan_to_vlan_id(ex_primary_nic_vlan) |
| ET.SubElement(pri_nic, "vlanId").text = vlan_id |
| |
| if ex_primary_nic_network_adapter is not None: |
| ET.SubElement(pri_nic, "networkAdapter").text = ex_primary_nic_network_adapter |
| |
| if isinstance(ex_additional_nics, (list, tuple)): |
| for nic in ex_additional_nics: |
| additional_nic = ET.SubElement(network_elm, "additionalNic") |
| |
| if nic.private_ip_v4 is None and nic.vlan is None: |
| raise ValueError( |
| "Either a vlan or private_ip_v4 " |
| "must be specified for each " |
| "additional nic." |
| ) |
| |
| if nic.private_ip_v4 is not None and nic.vlan is not None: |
| raise ValueError( |
| "Either a vlan or private_ip_v4 " |
| "must be specified for each " |
| "additional nic. Not both." |
| ) |
| |
| if nic.private_ip_v4 is not None: |
| ET.SubElement(additional_nic, "privateIpv4").text = nic.private_ip_v4 |
| |
| if nic.vlan is not None: |
| vlan_id = self._vlan_to_vlan_id(nic.vlan) |
| ET.SubElement(additional_nic, "vlanId").text = vlan_id |
| |
| if nic.network_adapter_name is not None: |
| ET.SubElement(additional_nic, "networkAdapter").text = nic.network_adapter_name |
| elif ex_additional_nics is not None: |
| raise TypeError("ex_additional_NICs must be None or tuple/list") |
| |
| if isinstance(ex_disks, (list, tuple)): |
| for disk in ex_disks: |
| disk_elm = ET.SubElement(server_uncustomized_elm, "disk") |
| disk_elm.set("scsiId", disk.scsi_id) |
| disk_elm.set("speed", disk.speed) |
| elif ex_disks is not None: |
| raise TypeError("ex_disks must be None or tuple/list") |
| |
| # tagid and tagname value pair should not co-exists |
| if ex_tagid_value_pairs is not None and ex_tagname_value_pairs is not None: |
| raise ValueError( |
| "ex_tagid_value_pairs and ex_tagname_value_pairs" |
| "is mutually exclusive. Use one or the other." |
| ) |
| |
| # Tag by ID |
| if ex_tagid_value_pairs is not None: |
| if not isinstance(ex_tagid_value_pairs, dict): |
| raise ValueError("ex_tagid_value_pairs must be a dictionary.") |
| |
| if sys.version_info[0] < 3: |
| tagid_items = ex_tagid_value_pairs.iteritems() |
| else: |
| tagid_items = ex_tagid_value_pairs.items() |
| |
| for k, v in tagid_items: |
| tag_elem = ET.SubElement(server_uncustomized_elm, "tagById") |
| ET.SubElement(tag_elem, "tagKeyId").text = k |
| |
| if v is not None: |
| ET.SubElement(tag_elem, "value").text = v |
| |
| if ex_tagname_value_pairs is not None: |
| if not isinstance(ex_tagname_value_pairs, dict): |
| raise ValueError("ex_tagname_value_pairs must be a dictionary") |
| |
| if sys.version_info[0] < 3: |
| tags_items = ex_tagname_value_pairs.iteritems() |
| else: |
| tags_items = ex_tagname_value_pairs.items() |
| |
| for k, v in tags_items: |
| tag_name_elem = ET.SubElement(server_uncustomized_elm, "tag") |
| ET.SubElement(tag_name_elem, "tagKeyName").text = k |
| |
| if v is not None: |
| ET.SubElement(tag_name_elem, "value").text = v |
| |
| response = self.connection.request_with_orgId_api_2( |
| "server/deployUncustomizedServer", |
| method="POST", |
| data=ET.tostring(server_uncustomized_elm), |
| ).object |
| |
| node_id = None |
| for info in findall(response, "info", TYPES_URN): |
| if info.get("name") == "serverId": |
| node_id = info.get("value") |
| |
| new_node = self.ex_get_node_by_id(node_id) |
| |
| return new_node |
| |
| # DRS methods |
| def ex_create_consistency_group( |
| self, |
| name, |
| journal_size_gb, |
| source_server_id, |
| target_server_id, |
| description=None, |
| ): |
| """ |
| Create a consistency group |
| |
| :param name: Name of consistency group |
| :type name: ``str`` |
| |
| :param journal_size_gb: Journal size in GB |
| :type journal_size_gb: ``str`` |
| |
| :param source_server_id: Id of the server to copy |
| :type source_server_id: ``str`` |
| |
| :param target_server_id: Id of the server to receive the copy |
| :type: target_server_id: ``str`` |
| |
| :param description: (Optional) Description of consistency group |
| :type: description: ``str`` |
| |
| :rtype: :class:`NttCisConsistencyGroup` |
| """ |
| |
| consistency_group_elm = ET.Element("createConsistencyGroup", {"xmlns": TYPES_URN}) |
| ET.SubElement(consistency_group_elm, "name").text = name |
| if description is not None: |
| ET.SubElement(consistency_group_elm, "description").text = description |
| ET.SubElement(consistency_group_elm, "journalSizeGb").text = journal_size_gb |
| server_pair = ET.SubElement(consistency_group_elm, "serverPair") |
| ET.SubElement(server_pair, "sourceServerId").text = source_server_id |
| ET.SubElement(server_pair, "targetServerId").text = target_server_id |
| response = self.connection.request_with_orgId_api_2( |
| "consistencyGroup/createConsistencyGroup", |
| method="POST", |
| data=ET.tostring(consistency_group_elm), |
| ).object |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| @get_params |
| def ex_list_consistency_groups(self, params={}): |
| """ |
| Functions takes a named parameter that must be one of the following |
| :param params: A dictionary composed of one of the following keys |
| and a value |
| * target_data_center_id= |
| * source_network_domain_id= |
| * target_network_domain_id= |
| * source_server_id= |
| * target_server_id= |
| * name= |
| * state= |
| * operation_status= |
| * drs_infrastructure_status= |
| :rtype: `list` of :class: `NttCisConsistencyGroup` |
| """ |
| |
| response = self.connection.request_with_orgId_api_2( |
| "consistencyGroup/consistencyGroup", params=params |
| ).object |
| cgs = self._to_consistency_groups(response) |
| return cgs |
| |
| def ex_get_consistency_group(self, consistency_group_id): |
| """ |
| Retrieves a Consistency by it's id and is more efficient thatn listing |
| all consistency groups and filtering that result. |
| |
| :param consistency_group_id: An id of a consistency group |
| :type consistency_group_id: ``str`` |
| |
| :rtype: :class:`NttCisConsistencygroup` |
| """ |
| response = self.connection.request_with_orgId_api_2( |
| "consistencyGroup/consistencyGroup/%s" % consistency_group_id |
| ).object |
| cg = self._to_process(response) |
| return cg |
| |
| def ex_list_consistency_group_snapshots( |
| self, consistency_group_id, create_time_min=None, create_time_max=None |
| ): |
| """ |
| Optional parameters identify the date of creation of Consistency Group |
| snapshots in *XML Schema (XSD) date time format. Best used as a |
| combination of createTime.MIN and createTime.MAX. If neither is |
| provided then all snapshots up to the possible maximum of 1014 |
| will be returned. If MIN is provided by itself, all snapshots |
| between the time specified by MIN and the point in time of |
| execution will be returned. If MAX is provided by itself, |
| then all snapshots up to that point in time (up to the |
| maximum number of 1014) will be returned. MIN and MAX are |
| inclusive for this API function |
| |
| :param consistency_group_id: The id of consistency group |
| :type consistency_group_id: ``str`` |
| |
| :param create_time_min: (Optional) in form YYYY-MM-DDT00:00.00.00Z or |
| substitute time offset for Z, i.e, |
| -05:00 |
| :type create_time_min: ``str`` |
| |
| :param create_time_max: (Optional) in form YYYY-MM-DDT00:00:00.000Z or |
| substitute time offset for Z, i.e, |
| -05:00 |
| :type create_time_max: ``str`` |
| |
| :rtype: `list` of :class:`NttCisSnapshots` |
| """ |
| |
| if create_time_min is None and create_time_max is None: |
| params = {"consistencyGroupId": consistency_group_id} |
| elif create_time_min and create_time_max: |
| params = { |
| "consistencyGroupId": consistency_group_id, |
| "createTime.MIN": create_time_min, |
| "createTime.MAX": create_time_max, |
| } |
| elif create_time_min or create_time_max: |
| if create_time_max is not None: |
| params = { |
| "consistencyGroupId": consistency_group_id, |
| "createTime.MAX": create_time_max, |
| } |
| elif create_time_min is not None: |
| params = { |
| "consistencyGroupId": consistency_group_id, |
| "createTime.MIN": create_time_min, |
| } |
| paged_result = self.connection.request_with_orgId_api_2( |
| "consistencyGroup/snapshot", method="GET", params=params |
| ).object |
| snapshots = self._to_process(paged_result) |
| return snapshots |
| |
| def ex_expand_journal(self, consistency_group_id, size_gb): |
| """ |
| Expand the consistency group's journhal size in 100Gb increments. |
| |
| :param consistency_group_id: The consistency group's UUID |
| :type consistency_group_id: ``str`` |
| |
| :param size_gb: Gb in 100 Gb increments |
| :type size_gb: ``str`` |
| |
| :return: True if response_code contains either 'IN_PROGRESS' or 'OK' |
| otherwise False |
| :rtype: ``bool`` |
| """ |
| |
| expand_elm = ET.Element("expandJournal", {"id": consistency_group_id, "xmlns": TYPES_URN}) |
| ET.SubElement(expand_elm, "sizeGb").text = size_gb |
| response = self.connection.request_with_orgId_api_2( |
| "consistencyGroup/expandJournal", |
| method="POST", |
| data=ET.tostring(expand_elm), |
| ).object |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_start_drs_failover_preview(self, consistency_group_id, snapshot_id): |
| """ |
| Brings a Consistency Group into PREVIEWING_SNAPSHOT mode. |
| |
| :param consistency_group_id: Id of the Consistency Group to put into |
| PRIEVEW_MODE |
| :type consistency_group_id: ``str`` |
| |
| :param snapshot_id: Id of the Snapshot to preview |
| :type snapshot_id: ``str`` |
| |
| :return: True if response_code contains either 'IN_PROGRESS' or 'OK' |
| otherwise False |
| :rtype: ``bool`` |
| """ |
| preview_elm = ET.Element( |
| "startPreviewSnapshot", |
| {"consistencyGroupId": consistency_group_id, "xmlns": TYPES_URN}, |
| ) |
| ET.SubElement(preview_elm, "snapshotId").text = snapshot_id |
| response = self.connection.request_with_orgId_api_2( |
| "consistencyGroup/startPreviewSnapshot", |
| method="POST", |
| data=ET.tostring(preview_elm), |
| ).object |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_stop_drs_failover_preview(self, consistency_group_id): |
| """ |
| Takes a Consistency Group out of PREVIEW_MODE and back to DRS_MODE |
| |
| :param consistency_group_id: Consistency Group's Id |
| :type ``str`` |
| |
| :return: True if response_code contains either 'IN_PROGRESS' or 'OK' |
| otherwise False |
| :rtype: ``bool`` |
| """ |
| preview_elm = ET.Element( |
| "stopPreviewSnapshot", |
| {"consistencyGroupId": consistency_group_id, "xmlns": TYPES_URN}, |
| ) |
| response = self.connection.request_with_orgId_api_2( |
| "consistencyGroup/stopPreviewSnapshot", |
| method="POST", |
| data=ET.tostring(preview_elm), |
| ).object |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_initiate_drs_failover(self, consistency_group_id): |
| """ |
| This method is irreversible. |
| It will failover the Consistency Group while removing it as well. |
| |
| :param consistency_group_id: Consistency Group's Id to failover |
| :type consistency_group_id: ``str`` |
| |
| :return: True if response_code contains either |
| IN_PROGRESS' or 'OK' otherwise False |
| :rtype: ``bool`` |
| """ |
| failover_elm = ET.Element( |
| "initiateFailover", |
| {"consistencyGroupId": consistency_group_id, "xmlns": TYPES_URN}, |
| ) |
| response = self.connection.request_with_orgId_api_2( |
| "consistencyGroup/initiateFailover", |
| method="POST", |
| data=ET.tostring(failover_elm), |
| ).object |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def ex_delete_consistency_group(self, consistency_group_id): |
| """ |
| Delete's a Consistency Group |
| |
| :param consistency_group_id: Id of Consistency Group to delete |
| :type ``str`` |
| :return: True if response_code contains either |
| IN_PROGRESS' or 'OK' otherwise False |
| :rtype: ``bool`` |
| """ |
| delete_elm = ET.Element( |
| "deleteConsistencyGroup", {"id": consistency_group_id, "xmlns": TYPES_URN} |
| ) |
| response = self.connection.request_with_orgId_api_2( |
| "consistencyGroup/deleteConsistencyGroup", |
| method="POST", |
| data=ET.tostring(delete_elm), |
| ).object |
| response_code = findtext(response, "responseCode", TYPES_URN) |
| return response_code in ["IN_PROGRESS", "OK"] |
| |
| def _to_consistency_groups(self, object): |
| cgs = findall(object, "consistencyGroup", TYPES_URN) |
| return [self._to_process(el) for el in cgs] |
| |
| def _to_drs_snapshots(self, object): |
| snapshots = [] |
| for element in object.findall(fixxpath("snapshot", TYPES_URN)): |
| snapshots.append(self._to_process(element)) |
| return snapshots |
| |
| def _to_process(self, element): |
| return process_xml(ET.tostring(element)) |
| |
| def _format_csv(self, http_response): |
| text = http_response.read() |
| lines = str.splitlines(ensure_string(text)) |
| return [line.split(",") for line in lines] |
| |
| @staticmethod |
| def _get_tagging_asset_type(asset): |
| objecttype = type(asset) |
| if objecttype.__name__ in OBJECT_TO_TAGGING_ASSET_TYPE_MAP: |
| return OBJECT_TO_TAGGING_ASSET_TYPE_MAP[objecttype.__name__] |
| raise TypeError("Asset type %s cannot be tagged" % objecttype.__name__) |
| |
| def _list_nodes_single_page(self, params={}): |
| nodes = self.connection.request_with_orgId_api_2("server/server", params=params).object |
| return nodes |
| |
| def _to_tags(self, object): |
| tags = [] |
| for element in object.findall(fixxpath("tag", TYPES_URN)): |
| tags.append(self._to_tag(element)) |
| return tags |
| |
| def _to_tag(self, element): |
| tag_key = self._to_tag_key(element, from_tag_api=True) |
| return NttCisTag( |
| asset_type=findtext(element, "assetType", TYPES_URN), |
| asset_id=findtext(element, "assetId", TYPES_URN), |
| asset_name=findtext(element, "assetName", TYPES_URN), |
| datacenter=findtext(element, "datacenterId", TYPES_URN), |
| key=tag_key, |
| value=findtext(element, "value", TYPES_URN), |
| ) |
| |
| def _to_tag_keys(self, object): |
| keys = [] |
| for element in object.findall(fixxpath("tagKey", TYPES_URN)): |
| keys.append(self._to_tag_key(element)) |
| return keys |
| |
| def _to_tag_key(self, element, from_tag_api=False): |
| if from_tag_api: |
| id = findtext(element, "tagKeyId", TYPES_URN) |
| name = findtext(element, "tagKeyName", TYPES_URN) |
| else: |
| id = element.get("id") |
| name = findtext(element, "name", TYPES_URN) |
| |
| return NttCisTagKey( |
| id=id, |
| name=name, |
| description=findtext(element, "description", TYPES_URN), |
| value_required=self._str2bool(findtext(element, "valueRequired", TYPES_URN)), |
| display_on_report=self._str2bool(findtext(element, "displayOnReport", TYPES_URN)), |
| ) |
| |
| def _to_images(self, object, el_name="osImage"): |
| images = [] |
| locations = self.list_locations() |
| |
| # The CloudControl API will return all images |
| # in the current geographic region (even ones in |
| # datacenters the user's organisation does not have access to) |
| # |
| # We therefore need to filter out those images (since we can't |
| # get a NodeLocation for them) |
| location_ids = {location.id for location in locations} |
| |
| for element in object.findall(fixxpath(el_name, TYPES_URN)): |
| location_id = element.get("datacenterId") |
| if location_id in location_ids: |
| images.append(self._to_image(element, locations)) |
| |
| return images |
| |
| def _to_image(self, element, locations=None): |
| location_id = element.get("datacenterId") |
| if locations is None: |
| locations = self.list_locations(location_id) |
| |
| location = [loc for loc in locations if loc.id == location_id][0] |
| cpu_spec = self._to_cpu_spec(element.find(fixxpath("cpu", TYPES_URN))) |
| |
| if LooseVersion(self.connection.active_api_version) > LooseVersion("2.3"): |
| os_el = element.find(fixxpath("guest/operatingSystem", TYPES_URN)) |
| else: |
| os_el = element.find(fixxpath("operatingSystem", TYPES_URN)) |
| |
| if element.tag.endswith("customerImage"): |
| is_customer_image = True |
| else: |
| is_customer_image = False |
| extra = { |
| "description": findtext(element, "description", TYPES_URN), |
| "OS_type": os_el.get("family"), |
| "OS_displayName": os_el.get("displayName"), |
| "cpu": cpu_spec, |
| "memoryGb": findtext(element, "memoryGb", TYPES_URN), |
| "osImageKey": findtext(element, "osImageKey", TYPES_URN), |
| "created": findtext(element, "createTime", TYPES_URN), |
| "location": location, |
| "isCustomerImage": is_customer_image, |
| } |
| |
| return NodeImage( |
| id=element.get("id"), |
| name=str(findtext(element, "name", TYPES_URN)), |
| extra=extra, |
| driver=self.connection.driver, |
| ) |
| |
| def _to_nat_rules(self, object, network_domain): |
| rules = [] |
| for element in findall(object, "natRule", TYPES_URN): |
| rules.append(self._to_nat_rule(element, network_domain)) |
| |
| return rules |
| |
| def _to_nat_rule(self, element, network_domain): |
| return NttCisNatRule( |
| id=element.get("id"), |
| network_domain=network_domain, |
| internal_ip=findtext(element, "internalIp", TYPES_URN), |
| external_ip=findtext(element, "externalIp", TYPES_URN), |
| status=findtext(element, "state", TYPES_URN), |
| ) |
| |
| def _to_anti_affinity_rules(self, object): |
| rules = [] |
| for element in findall(object, "antiAffinityRule", TYPES_URN): |
| rules.append(self._to_anti_affinity_rule(element)) |
| return rules |
| |
| def _to_anti_affinity_rule(self, element): |
| node_list = [] |
| for node in findall(element, "serverSummary", TYPES_URN): |
| node_list.append(node.get("id")) |
| return NttCisAntiAffinityRule(id=element.get("id"), node_list=node_list) |
| |
| def _to_firewall_rules(self, object, network_domain): |
| rules = [] |
| locations = self.list_locations() |
| for element in findall(object, "firewallRule", TYPES_URN): |
| rules.append(self._to_firewall_rule(element, locations, network_domain)) |
| |
| return rules |
| |
| def _to_firewall_rule(self, element, locations, network_domain): |
| location_id = element.get("datacenterId") |
| location = list(filter(lambda x: x.id == location_id, locations))[0] |
| # For future dynamic rule creation |
| # return process_xml(ET.tostring(element)) |
| |
| return NttCisFirewallRule( |
| id=element.get("id"), |
| network_domain=network_domain, |
| name=findtext(element, "name", TYPES_URN), |
| action=findtext(element, "action", TYPES_URN), |
| ip_version=findtext(element, "ipVersion", TYPES_URN), |
| protocol=findtext(element, "protocol", TYPES_URN), |
| enabled=findtext(element, "enabled", TYPES_URN), |
| source=self._to_firewall_address(element.find(fixxpath("source", TYPES_URN))), |
| destination=self._to_firewall_address(element.find(fixxpath("destination", TYPES_URN))), |
| location=location, |
| status=findtext(element, "state", TYPES_URN), |
| ) |
| |
| def _to_firewall_address(self, element): |
| ip = element.find(fixxpath("ip", TYPES_URN)) |
| port = element.find(fixxpath("port", TYPES_URN)) |
| port_list = element.find(fixxpath("portList", TYPES_URN)) |
| address_list = element.find(fixxpath("ipAddressList", TYPES_URN)) |
| if address_list is None: |
| return NttCisFirewallAddress( |
| any_ip=ip.get("address") == "ANY", |
| ip_address=ip.get("address"), |
| ip_prefix_size=ip.get("prefixSize"), |
| port_begin=port.get("begin") if port is not None else None, |
| port_end=port.get("end") if port is not None else None, |
| port_list_id=port_list.get("id", None) if port_list is not None else None, |
| address_list_id=address_list.get("id") if address_list is not None else None, |
| ) |
| else: |
| return NttCisFirewallAddress( |
| any_ip=False, |
| ip_address=None, |
| ip_prefix_size=None, |
| port_begin=None, |
| port_end=None, |
| port_list_id=port_list.get("id", None) if port_list is not None else None, |
| address_list_id=address_list.get("id") if address_list is not None else None, |
| ) |
| |
| def _to_ip_blocks(self, object): |
| blocks = [] |
| locations = self.list_locations() |
| for element in findall(object, "publicIpBlock", TYPES_URN): |
| blocks.append(self._to_ip_block(element, locations)) |
| |
| return blocks |
| |
| def _to_ip_block(self, element, locations): |
| location_id = element.get("datacenterId") |
| location = list(filter(lambda x: x.id == location_id, locations))[0] |
| |
| return NttCisPublicIpBlock( |
| id=element.get("id"), |
| network_domain=self.ex_get_network_domain( |
| findtext(element, "networkDomainId", TYPES_URN) |
| ), |
| base_ip=findtext(element, "baseIp", TYPES_URN), |
| size=findtext(element, "size", TYPES_URN), |
| location=location, |
| status=findtext(element, "state", TYPES_URN), |
| ) |
| |
| def _to_networks(self, object): |
| networks = [] |
| locations = self.list_locations() |
| for element in findall(object, "network", NETWORK_NS): |
| networks.append(self._to_network(element, locations)) |
| |
| return networks |
| |
| def _to_network(self, element, locations): |
| multicast = False |
| if findtext(element, "multicast", NETWORK_NS) == "true": |
| multicast = True |
| |
| status = self._to_status(element.find(fixxpath("status", NETWORK_NS))) |
| |
| location_id = findtext(element, "location", NETWORK_NS) |
| location = list(filter(lambda x: x.id == location_id, locations))[0] |
| |
| return NttCisNetwork( |
| id=findtext(element, "id", NETWORK_NS), |
| name=findtext(element, "name", NETWORK_NS), |
| description=findtext(element, "description", NETWORK_NS), |
| location=location, |
| private_net=findtext(element, "privateNet", NETWORK_NS), |
| multicast=multicast, |
| status=status, |
| ) |
| |
| def _to_network_domains(self, object): |
| network_domains = [] |
| locations = self.list_locations() |
| for element in findall(object, "networkDomain", TYPES_URN): |
| network_domains.append(self._to_network_domain(element, locations)) |
| return network_domains |
| |
| def _to_network_domain(self, element, locations): |
| location_id = element.get("datacenterId") |
| location = list(filter(lambda x: x.id == location_id, locations))[0] |
| plan = findtext(element, "type", TYPES_URN) |
| if plan == "ESSENTIALS": |
| plan_type = NetworkDomainServicePlan.ESSENTIALS |
| else: |
| plan_type = NetworkDomainServicePlan.ADVANCED |
| return NttCisNetworkDomain( |
| id=element.get("id"), |
| name=findtext(element, "name", TYPES_URN), |
| description=findtext(element, "description", TYPES_URN), |
| plan=plan_type, |
| location=location, |
| status=findtext(element, "state", TYPES_URN), |
| ) |
| |
| def _to_vlans(self, object): |
| vlans = [] |
| locations = self.list_locations() |
| for element in findall(object, "vlan", TYPES_URN): |
| vlans.append(self._to_vlan(element, locations=locations)) |
| |
| return vlans |
| |
| def _to_vlan(self, element, locations): |
| location_id = element.get("datacenterId") |
| location = list(filter(lambda x: x.id == location_id, locations))[0] |
| ip_range = element.find(fixxpath("privateIpv4Range", TYPES_URN)) |
| ip6_range = element.find(fixxpath("ipv6Range", TYPES_URN)) |
| network_domain_el = element.find(fixxpath("networkDomain", TYPES_URN)) |
| network_domain = self.ex_get_network_domain(network_domain_el.get("id")) |
| return NttCisVlan( |
| id=element.get("id"), |
| name=findtext(element, "name", TYPES_URN), |
| description=findtext(element, "description", TYPES_URN), |
| network_domain=network_domain, |
| private_ipv4_range_address=ip_range.get("address"), |
| private_ipv4_range_size=int(ip_range.get("prefixSize")), |
| ipv6_range_address=ip6_range.get("address"), |
| ipv6_range_size=int(ip6_range.get("prefixSize")), |
| ipv4_gateway=findtext(element, "ipv4GatewayAddress", TYPES_URN), |
| ipv6_gateway=findtext(element, "ipv6GatewayAddress", TYPES_URN), |
| location=location, |
| status=findtext(element, "state", TYPES_URN), |
| ) |
| |
| def _to_locations(self, object): |
| locations = [] |
| for element in object.findall(fixxpath("datacenter", TYPES_URN)): |
| locations.append(self._to_location(element)) |
| |
| return locations |
| |
| def _to_location(self, element): |
| loc = NodeLocation( |
| id=element.get("id"), |
| name=findtext(element, "displayName", TYPES_URN), |
| country=findtext(element, "country", TYPES_URN), |
| driver=self, |
| ) |
| return loc |
| |
| def _to_cpu_spec(self, element): |
| return NttCisServerCpuSpecification( |
| cpu_count=int(element.get("count")), |
| cores_per_socket=int(element.get("coresPerSocket")), |
| performance=element.get("speed"), |
| ) |
| |
| def _to_vmware_tools(self, element): |
| status = None |
| if hasattr(element, "runningStatus"): |
| status = element.get("runningStatus") |
| |
| version_status = None |
| if hasattr(element, "version_status"): |
| version_status = element.get("version_status") |
| |
| api_version = None |
| if hasattr(element, "apiVersion"): |
| api_version = element.get("apiVersion") |
| |
| return NttCisServerVMWareTools( |
| status=status, version_status=version_status, api_version=api_version |
| ) |
| |
| def _to_scsi_controllers(self, elements): |
| return NttCisScsiController( |
| id=elements.get("id"), |
| adapter_type=elements.get("adapterType"), |
| bus_number=elements.get("busNumber"), |
| state=elements.get("state"), |
| ) |
| |
| def _to_disks(self, object): |
| disk_elements = object.findall(fixxpath("disk", TYPES_URN)) |
| return [self._to_disk(el) for el in disk_elements] |
| |
| def _to_disk(self, element): |
| return NttCisServerDisk( |
| id=element.get("id"), |
| scsi_id=int(element.get("scsiId")), |
| size_gb=int(element.get("sizeGb")), |
| speed=element.get("speed"), |
| state=element.get("state"), |
| ) |
| |
| def _to_snapshots(self, object): |
| """ |
| Takes an xml object and gathers indivicual elements |
| :param object: XML data |
| :return: list of dictionaries with keys |
| id |
| start_time |
| end_time |
| expiry_time |
| type of snapshot |
| state |
| """ |
| |
| snapshot_elements = object.findall(fixxpath("snapshot", TYPES_URN)) |
| return [self._to_snapshot(el) for el in snapshot_elements] |
| |
| def _to_snapshot(self, element): |
| return { |
| "id": element.get("id"), |
| "start_time": findtext(element, "startTime", TYPES_URN), |
| "end_time": findtext(element, "endTime", TYPES_URN), |
| "expiry_time": findtext(element, "expiryTime", TYPES_URN), |
| "type": findtext(element, "type", TYPES_URN), |
| "state": findtext(element, "state", TYPES_URN), |
| } |
| |
| def _to_ipv4_addresses(self, object): |
| ipv4_address_elements = object.findall(fixxpath("ipv4", TYPES_URN)) |
| return [self._to_ipv4_6_address(el) for el in ipv4_address_elements] |
| |
| def _to_ipv6_addresses(self, object): |
| ipv6_address_elements = object.findall(fixxpath("reservedIpv6Address", TYPES_URN)) |
| return [self._to_ipv4_6_address(el) for el in ipv6_address_elements] |
| |
| def _to_ipv4_6_address(self, element): |
| return NttCisReservedIpAddress( |
| element.get("datacenterId"), |
| element.get("exclusive"), |
| findtext(element, "vlanId", TYPES_URN), |
| findtext(element, "ipAddress", TYPES_URN), |
| description=findtext(element, "description", TYPES_URN), |
| ) |
| |
| def _to_windows(self, object): |
| snapshot_window_elements = object.findall(fixxpath("snapshotWindow", TYPES_URN)) |
| return [self._to_window(el) for el in snapshot_window_elements] |
| |
| def _to_window(self, element): |
| return { |
| "id": element.get("id"), |
| "day_of_week": element.get("dayOfWeek"), |
| "start_hour": element.get("startHour"), |
| "availability_status": element.get("availabilityStatus"), |
| } |
| |
| def _to_nodes(self, object): |
| node_elements = object.findall(fixxpath("server", TYPES_URN)) |
| return [self._to_node(el) for el in node_elements] |
| |
| def _to_node(self, element): |
| # Get all information at once and process in common/nttcis |
| # Below, future to dynamically generate classes |
| # return process_xml(ET.tostring(element)) |
| started = findtext(element, "started", TYPES_URN) |
| status = self._to_status(element.find(fixxpath("progress", TYPES_URN))) |
| dd_state = findtext(element, "state", TYPES_URN) |
| |
| node_state = self._get_node_state(dd_state, started, status.action) |
| |
| has_network_info = element.find(fixxpath("networkInfo", TYPES_URN)) is not None |
| cpu_spec = self._to_cpu_spec(element.find(fixxpath("cpu", TYPES_URN))) |
| |
| has_snapshot = element.find(fixxpath("snapshotService", TYPES_URN)) is not None |
| has_scsi = element.find(fixxpath("scsiController", TYPES_URN)) is not None |
| has_sata = element.find(fixxpath("sataController", TYPES_URN)) is not None |
| has_ide = element.find(fixxpath("ideController")) is not None |
| scsi_controllers = [] |
| disks = [] |
| if has_scsi: |
| scsi_controllers.append( |
| self._to_scsi_controllers(element.find(fixxpath("scsiController", TYPES_URN))) |
| ) |
| for scsi in element.findall(fixxpath("scsiController", TYPES_URN)): |
| disks.extend(self._to_disks(scsi)) |
| if has_sata: |
| for sata in element.findall(fixxpath("sataController", TYPES_URN)): |
| disks.extend(self._to_disks(sata)) |
| if has_ide: |
| for ide in element.findall(fixxpath("ideController", TYPES_URN)): |
| disks.extend(self._to_snapshot(ide)) |
| |
| # Vmware Tools |
| |
| # Version 2.3 or earlier |
| if LooseVersion(self.connection.active_api_version) < LooseVersion("2.4"): |
| vmware_tools = self._to_vmware_tools(element.find(fixxpath("vmwareTools", TYPES_URN))) |
| operation_system = element.find(fixxpath("operatingSystem", TYPES_URN)) |
| # Version 2.4 or later |
| else: |
| # vmtools_elm = fixxpath('guest/vmTools', TYPES_URN) |
| vmtools_elm = element.find(fixxpath("guest/vmTools", TYPES_URN)) |
| if vmtools_elm is not None: |
| vmware_tools = self._to_vmware_tools(vmtools_elm) |
| else: |
| vmware_tools = NttCisServerVMWareTools( |
| status=None, version_status=None, api_version=None |
| ) |
| operation_system = element.find(fixxpath("guest/operatingSystem", TYPES_URN)) |
| |
| extra = { |
| "description": findtext(element, "description", TYPES_URN), |
| "sourceImageId": findtext(element, "sourceImageId", TYPES_URN), |
| "networkId": findtext(element, "networkId", TYPES_URN), |
| "networkDomainId": element.find(fixxpath("networkInfo", TYPES_URN)).get( |
| "networkDomainId" |
| ) |
| if has_network_info |
| else None, |
| "datacenterId": element.get("datacenterId"), |
| "deployedTime": findtext(element, "createTime", TYPES_URN), |
| "window": ( |
| element.find(fixxpath("snapshotService/window", TYPES_URN)).get("dayOfWeek"), |
| element.find(fixxpath("snapshotService/window", TYPES_URN)).get("startHour"), |
| ) |
| if has_snapshot |
| else None, |
| "cpu": cpu_spec, |
| "memoryMb": int(findtext(element, "memoryGb", TYPES_URN)) * 1024, |
| "OS_id": operation_system.get("id"), |
| "OS_type": operation_system.get("family"), |
| "OS_displayName": operation_system.get("displayName"), |
| "status": status, |
| "scsi_controller": scsi_controllers, |
| "disks": disks, |
| "vmWareTools": vmware_tools, |
| } |
| |
| public_ip = findtext(element, "publicIpAddress", TYPES_URN) |
| |
| private_ip = ( |
| element.find(fixxpath("networkInfo/primaryNic", TYPES_URN)).get("privateIpv4") |
| if has_network_info |
| else element.find(fixxpath("nic", TYPES_URN)).get("privateIpv4") |
| ) |
| |
| extra["ipv6"] = ( |
| element.find(fixxpath("networkInfo/primaryNic", TYPES_URN)).get("ipv6") |
| if has_network_info |
| else element.find(fixxpath("nic", TYPES_URN)).get("ipv6") |
| ) |
| |
| n = Node( |
| id=element.get("id"), |
| name=findtext(element, "name", TYPES_URN), |
| state=node_state, |
| public_ips=[public_ip] if public_ip is not None else [], |
| private_ips=[private_ip] if private_ip is not None else [], |
| size=self.list_sizes()[0], |
| image=NodeImage( |
| extra["sourceImageId"], extra["OS_displayName"], self.connection.driver |
| ), |
| driver=self.connection.driver, |
| extra=extra, |
| ) |
| return n |
| |
| def _to_status(self, element): |
| if element is None: |
| return NttCisStatus() |
| s = NttCisStatus( |
| action=findtext(element, "action", TYPES_URN), |
| request_time=findtext(element, "requestTime", TYPES_URN), |
| user_name=findtext(element, "userName", TYPES_URN), |
| number_of_steps=findtext(element, "numberOfSteps", TYPES_URN), |
| step_name=findtext(element, "step/name", TYPES_URN), |
| step_number=findtext(element, "step_number", TYPES_URN), |
| step_percent_complete=findtext(element, "step/percentComplete", TYPES_URN), |
| failure_reason=findtext(element, "failureReason", TYPES_URN), |
| ) |
| return s |
| |
| def _to_ip_address_lists(self, object): |
| ip_address_lists = [] |
| for element in findall(object, "ipAddressList", TYPES_URN): |
| ip_address_lists.append(self._to_ip_address_list(element)) |
| |
| return ip_address_lists |
| |
| def _to_ip_address_list(self, element): |
| ipAddresses = [] |
| for ip in findall(element, "ipAddress", TYPES_URN): |
| ipAddresses.append(self._to_ip_address(ip)) |
| |
| child_ip_address_lists = [] |
| for child_ip_list in findall(element, "childIpAddressList", TYPES_URN): |
| child_ip_address_lists.append(self._to_child_ip_list(child_ip_list)) |
| |
| return NttCisIpAddressList( |
| id=element.get("id"), |
| name=findtext(element, "name", TYPES_URN), |
| description=findtext(element, "description", TYPES_URN), |
| ip_version=findtext(element, "ipVersion", TYPES_URN), |
| ip_address_collection=ipAddresses, |
| state=findtext(element, "state", TYPES_URN), |
| create_time=findtext(element, "createTime", TYPES_URN), |
| child_ip_address_lists=child_ip_address_lists, |
| ) |
| |
| def _to_child_ip_list(self, element): |
| return NttCisChildIpAddressList(id=element.get("id"), name=element.get("name")) |
| |
| def _to_ip_address(self, element): |
| return NttCisIpAddress( |
| begin=element.get("begin"), |
| end=element.get("end"), |
| prefix_size=element.get("prefixSize"), |
| ) |
| |
| def _to_port_lists(self, object): |
| port_lists = [] |
| for element in findall(object, "portList", TYPES_URN): |
| port_lists.append(self._to_port_list(element)) |
| |
| return port_lists |
| |
| def _to_port_list(self, element): |
| ports = [] |
| for port in findall(element, "port", TYPES_URN): |
| ports.append(self._to_port(element=port)) |
| |
| child_portlist_list = [] |
| for child in findall(element, "childPortList", TYPES_URN): |
| child_portlist_list.append(self._to_child_port_list(element=child)) |
| |
| return NttCisPortList( |
| id=element.get("id"), |
| name=findtext(element, "name", TYPES_URN), |
| description=findtext(element, "description", TYPES_URN), |
| port_collection=ports, |
| child_portlist_list=child_portlist_list, |
| state=findtext(element, "state", TYPES_URN), |
| create_time=findtext(element, "createTime", TYPES_URN), |
| ) |
| |
| def _image_needs_auth(self, image): |
| if not isinstance(image, NodeImage): |
| image = self.ex_get_image_by_id(image) |
| if image.extra["isCustomerImage"] and image.extra["OS_type"] == "UNIX": |
| return False |
| return True |
| |
| @staticmethod |
| def _to_port(element): |
| return NttCisPort(begin=element.get("begin"), end=element.get("end")) |
| |
| @staticmethod |
| def _to_child_port_list(element): |
| return NttCisChildPortList(id=element.get("id"), name=element.get("name")) |
| |
| @staticmethod |
| def _get_node_state(state, started, action): |
| try: |
| return NODE_STATE_MAP[(state, started, action)] |
| except KeyError: |
| if started == "true": |
| return NodeState.RUNNING |
| else: |
| return NodeState.TERMINATED |
| |
| @staticmethod |
| def _node_to_node_id(node): |
| return dd_object_to_id(node, Node) |
| |
| @staticmethod |
| def _location_to_location_id(location): |
| return dd_object_to_id(location, NodeLocation) |
| |
| @staticmethod |
| def _vlan_to_vlan_id(vlan): |
| return dd_object_to_id(vlan, NttCisVlan) |
| |
| @staticmethod |
| def _image_to_image_id(image): |
| return dd_object_to_id(image, NodeImage) |
| |
| @staticmethod |
| def _network_to_network_id(network): |
| return dd_object_to_id(network, NttCisNetwork) |
| |
| @staticmethod |
| def _anti_affinity_rule_to_anti_affinity_rule_id(rule): |
| return dd_object_to_id(rule, NttCisAntiAffinityRule) |
| |
| @staticmethod |
| def _network_domain_to_network_domain_id(network_domain): |
| return dd_object_to_id(network_domain, NttCisNetworkDomain) |
| |
| @staticmethod |
| def _tag_key_to_tag_key_id(tag_key): |
| return dd_object_to_id(tag_key, NttCisTagKey) |
| |
| @staticmethod |
| def _tag_key_to_tag_key_name(tag_key): |
| return dd_object_to_id(tag_key, NttCisTagKey, id_value="name") |
| |
| @staticmethod |
| def _ip_address_list_to_ip_address_list_id(ip_addr_list): |
| return dd_object_to_id(ip_addr_list, NttCisIpAddressList, id_value="id") |
| |
| @staticmethod |
| def _child_ip_address_list_to_child_ip_address_list_id(child_ip_addr_list): |
| return dd_object_to_id(child_ip_addr_list, NttCisChildIpAddressList, id_value="id") |
| |
| @staticmethod |
| def _port_list_to_port_list_id(port_list): |
| return dd_object_to_id(port_list, NttCisPortList, id_value="id") |
| |
| @staticmethod |
| def _child_port_list_to_child_port_list_id(child_port_list): |
| return dd_object_to_id(child_port_list, NttCisChildPortList, id_value="id") |
| |
| @staticmethod |
| def _str2bool(string): |
| return string.lower() in ("true") |