| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import logging |
| import functools |
| import bambou |
| |
| LOG = logging.getLogger() |
| |
| |
| class recreate_session_on_timeout(object): |
| def __init__(self, method): |
| self.method = method |
| |
| def __get__(self, obj=None, objtype=None): |
| @functools.wraps(self.method) |
| def _wrapper(*args, **kwargs): |
| try: |
| return self.method(obj, *args, **kwargs) |
| except bambou.exceptions.BambouHTTPError as e: |
| if e.connection.response.status_code == 401: |
| obj.session = obj.api_client.new_session() |
| return self.method(obj, *args, **kwargs) |
| else: |
| raise e |
| |
| return _wrapper |
| |
| |
| class VSDHelpers(object): |
| |
| def __init__(self, api_client): |
| """ |
| Create a wrapper |
| provide a cspsession and a vpsk object, all from the VSD object |
| """ |
| self.api_client = api_client |
| self.session = api_client.session |
| self.vspk = api_client.import_vspk(api_client.version) |
| |
| def update_vsd_session(self, api_client=None): |
| """ |
| This method is used when Helper is |
| initialized before we create a new_session. |
| """ |
| if api_client: |
| self.session = api_client.session |
| self.vspk = api_client.import_vspk(api_client.version) |
| else: |
| self.session = self.api_client.session |
| |
| @recreate_session_on_timeout |
| def add_user_to_group(self, enterprise, user=None, group=None, |
| usr_filter=None, grp_filter=None): |
| """ |
| Add user to a group on VSD. |
| For example: Add csproot to cms group |
| Here you can couple of things: |
| 1. enterprise can be id or NURest Object |
| 2. And User group both need to be NURest object |
| or both can be filters. |
| """ |
| if not isinstance(enterprise, self.vspk.NUEnterprise): |
| enterprise = self.vspk.NUEnterprise(id=enterprise) |
| if isinstance(group, self.vspk.NUGroup): |
| if isinstance(user, self.vspk.NUUser): |
| all_users = group.users.get() |
| all_users.append(user) |
| group.assign(all_users, self.vspk.NUUser) |
| elif usr_filter and grp_filter: |
| group = enterprise.groups.get_first(filter=grp_filter) |
| all_users = group.users.get() |
| user = enterprise.users.get_first(filter=usr_filter) |
| if not group: |
| LOG.error('could not fetch the group matching the filter "{}"' |
| .format(grp_filter)) |
| return |
| if not user: |
| LOG.error('could not fetch the user matching the filter "{}"' |
| .format(usr_filter)) |
| return |
| all_users.append(user) |
| group.assign(all_users, self.vspk.NUUser) |
| |
| def set_name_filter(self, name): |
| """ set name filter for vsd query |
| @param: name: string name |
| @return: filter string |
| """ |
| return 'name is "{}"'.format(name) |
| |
| def set_externalID_filter(self, id): |
| """ set externalID filter for vsd query |
| @param: id: string externalID |
| @return: filter string |
| """ |
| return 'externalID is "{}"'.format(id) |
| |
| @recreate_session_on_timeout |
| def get_enterprise(self, filter): |
| """ get_enterprise |
| @params: enterprise filter following vspk filter structure |
| @return: enterprise object |
| @Example: |
| self.vsd.get_enterprise( |
| filter='externalID == "{}"'.format(ext_id)) |
| """ |
| if not filter: |
| LOG.error('a filter is required') |
| return None |
| enterprise = self.session.user.enterprises.get_first(filter=filter) |
| if not enterprise: |
| LOG.error('could not fetch the enterprise matching the filter "{}"' |
| .format(filter)) |
| return enterprise |
| |
| @recreate_session_on_timeout |
| def get_l2domain(self, enterprise=None, filter=None): |
| """ get_l2domain |
| @params: enterprise object or enterprise id |
| filter following vspk filter structure |
| @return l2 domain object |
| @Example: |
| self.vsd.get_l2domain(enterprise=enterprise, |
| filter='name == "{}"'.format(name)) |
| self.vsd.get_l2domain(enterprise=enterprise_id, |
| filter='name == "{}"'.format(name)) |
| self.vsd.get_l2domain(filter='externalID == "{}"'.format(ext_id)) |
| """ |
| l2_domain = None |
| if enterprise: |
| if not isinstance(enterprise, self.vspk.NUEnterprise): |
| enterprise = self.vspk.NUEnterprise(id=enterprise) |
| l2_domain = enterprise.l2_domains.get_first(filter=filter) |
| elif filter: |
| l2_domain = self.session.user.l2_domains.get_first(filter=filter) |
| if not l2_domain: |
| LOG.error('could not fetch the l2 domain matching the filter "{}"' |
| .format(filter)) |
| return l2_domain |
| |
| @recreate_session_on_timeout |
| def get_domain(self, enterprise=None, filter=None): |
| """ get_domain |
| @params: enterprise object or enterprise id |
| filter following vspk filter structure |
| @return: domain object |
| @Example: |
| self.vsd.get_domain(enterprise=enterprise, |
| filter='name == "{}"'.format(name)) |
| self.vsd.get_domain(enterprise=enterprise_id, |
| filter='name == "{}"'.format(name)) |
| self.vsd.get_domain(filter='externalID == "{}"'.format(ext_id)) |
| """ |
| domain = None |
| if enterprise: |
| if not isinstance(enterprise, self.vspk.NUEnterprise): |
| enterprise = self.vspk.NUEnterprise(id=enterprise) |
| domain = enterprise.domains.get_first(filter=filter) |
| elif filter: |
| domain = self.session.user.domains.get_first(filter=filter) |
| if not domain: |
| LOG.error('could not fetch the domain matching the filter "{}"' |
| .format(filter)) |
| return domain |
| |
| @recreate_session_on_timeout |
| def get_domain_template(self, enterprise=None, filter=None): |
| """ get_domain |
| @params: enterprise object or enterprise id |
| filter following vspk filter structure |
| @return: domain object |
| @Example: |
| self.vsd.get_domain(enterprise=enterprise, |
| filter='name == "{}"'.format(name)) |
| self.vsd.get_domain(enterprise=enterprise_id, |
| filter='name == "{}"'.format(name)) |
| self.vsd.get_domain(filter='externalID == "{}"'.format(ext_id)) |
| """ |
| domain = None |
| if enterprise: |
| if not isinstance(enterprise, self.vspk.NUEnterprise): |
| enterprise = self.vspk.NUEnterprise(id=enterprise) |
| domain = enterprise.domain_templates.get_first(filter=filter) |
| elif filter: |
| domain = \ |
| self.session.user.domain_templates.get_first(filter=filter) |
| if not domain: |
| LOG.error('could not fetch the domain template ' |
| 'matching the filter "{}"' |
| .format(filter)) |
| return domain |
| |
| @recreate_session_on_timeout |
| def get_zone(self, domain=None, filter=None): |
| """ get_zone |
| @params: domain object or domain id |
| filter following vspk filter structure |
| @return: zone object |
| @Example: |
| self.vsd.get_zone(domain=domain, |
| filter='name == "{}"'.format(name)) |
| self.vsd.get_zone(domain=domain_id, |
| filter='name == "{}"'.format(name)) |
| self.vsd.get_zone(filter='externalID == "{}"'.format(ext_id)) |
| """ |
| zone = None |
| if domain: |
| if not isinstance(domain, self.vspk.NUDomain): |
| domain = self.vspk.NUDomain(id=domain) |
| zone = domain.zones.get_first(filter=filter) |
| elif filter: |
| zone = self.session.user.zones.get_first(filter=filter) |
| if not zone: |
| LOG.error('could not fetch the zone matching the filter "{}"' |
| .format(filter)) |
| return zone |
| |
| @recreate_session_on_timeout |
| def get_subnet(self, zone=None, filter=None): |
| """ get_subnet |
| @params: zone object or zone id |
| filter following vspk filter structure |
| @return: subnet object |
| @Example: |
| self.vsd.get_subnet(zone=zone, |
| filter='name == "{}"'.format(name)) |
| self.vsd.get_subnet(zone=zone_id, |
| filter='name == "{}"'.format(name)) |
| self.vsd.get_subnet(filter='externalID == "{}"'.format(ext_id)) |
| """ |
| subnet = None |
| if zone: |
| if not isinstance(zone, self.vspk.NUZone): |
| zone = self.vspk.NUZone(id=zone) |
| subnet = zone.subnets.get_first(filter=filter) |
| elif filter: |
| subnet = self.session.user.subnets.get_first(filter=filter) |
| if not subnet: |
| LOG.error('could not fetch the subnet matching the filter "{}"' |
| .format(filter)) |
| return subnet |
| |
| @recreate_session_on_timeout |
| def get_subnet_from_domain(self, domain=None, filter=None): |
| """ get_subnet |
| @params: domain object or domain id |
| filter following vspk filter structure |
| @return: subnet object |
| @Example: |
| self.vsd.get_subnet(domain=domain, |
| filter='name == "{}"'.format(name)) |
| self.vsd.get_subnet(domain=domain_id, |
| filter='name == "{}"'.format(name)) |
| self.vsd.get_subnet(filter='externalID == "{}"'.format(ext_id)) |
| """ |
| subnet = None |
| if domain: |
| if not isinstance(domain, self.vspk.NUDomain): |
| domain = self.vspk.NUDomain(id=domain) |
| subnet = domain.subnets.get_first(filter=filter) |
| elif filter: |
| subnet = self.session.user.subnets.get_first(filter=filter) |
| if not subnet: |
| LOG.error('could not fetch the subnet matching the filter "{}"' |
| .format(filter)) |
| return subnet |
| |
| @recreate_session_on_timeout |
| def get_vm(self, subnet=None, filter=None): |
| """ get_vm |
| @params: subnet object or subnet id |
| filter following vspk filter structure |
| @return: vm object |
| @Example: |
| self.vsd.get_vm(subnet=subnet, |
| filter='name == "{}"'.format(name)) |
| self.vsd.get_vm(subnet=subnet_id, |
| filter='name == "{}"'.format(name)) |
| self.vsd.get_vm(filter='externalID == "{}"'.format(ext_id)) |
| """ |
| vm = None |
| if subnet: |
| if not isinstance(subnet, self.vspk.NUSubnet): |
| subnet = self.vspk.NUSubnet(id=subnet) |
| vm = subnet.vms.get_first(filter=filter) |
| elif filter: |
| vm = self.session.user.vms.get_first(filter=filter) |
| if not vm: |
| LOG.error('could not fetch the vm matching the filter "{}"' |
| .format(filter)) |
| return vm |
| |
| @recreate_session_on_timeout |
| def get_subnet_dhcpoptions(self, subnet=None, filter=None): |
| """ get_subnet_dhcpoptions |
| @params: subnet object or |
| subnet filter following vspk filter structure |
| @return: subnet dhcpoptions object |
| @Example: |
| self.vsd.get_subnet_dhcpoptions(subnet=subnet) |
| self.vsd.get_subnet_dhcpoptions( |
| filter='externalID == "{}"'.format(subnet_externalID)) |
| """ |
| if not isinstance(subnet, self.vspk.NUSubnet): |
| if not filter: |
| LOG.error('a filter is required') |
| return None |
| subnet = self.session.user.subnets.get_first(filter=filter) |
| dhcp_options = subnet.dhcp_options.get() |
| if not dhcp_options: |
| if filter: |
| LOG.error('could not fetch the dhcp options on the subnet ' |
| 'matching the filter "{}"' |
| .format(filter)) |
| else: |
| LOG.error('could not fetch the dhcp options on the subnet') |
| return dhcp_options |
| |
| @recreate_session_on_timeout |
| def get_vport(self, subnet, filter): |
| """ get_vport |
| @params: subnet object |
| vport filter following vspk filter structure |
| @return: vport object |
| @Example: |
| self.vsd.get_vport(subnet=subnet, |
| filter='externalID == "{}"'.format(ext_id)) |
| """ |
| if not isinstance(subnet, self.vspk.NUSubnet): |
| LOG.error('a subnet is required') |
| return None |
| if not filter: |
| LOG.error('a filter is required') |
| return None |
| vport = subnet.vports.get_first(filter=filter) |
| if not vport: |
| LOG.error('could not fetch the vport from the subnet ' |
| 'matching the filter "{}"' |
| .format(filter)) |
| return vport |
| |
| @recreate_session_on_timeout |
| def get_vm_interface(self, filter): |
| """ get_vm_interface |
| @params: vm interface filter following vspk filter structure |
| @return: vm interface object |
| @Example: |
| self.vsd.get_vm_interface( |
| filter='externalID == "{}"'.format(ext_id)) |
| """ |
| if not filter: |
| LOG.error('a filter is required') |
| return None |
| vm_interface = self.session.user.vm_interfaces.get_first(filter=filter) |
| if not vm_interface: |
| LOG.error('could not fetch the vm interface ' |
| 'matching the filter "{}"' |
| .format(filter)) |
| return vm_interface |
| |
| @recreate_session_on_timeout |
| def get_vm_interface_policydecisions(self, vm_interface=None, filter=None): |
| """ get_vm_interface_policydecisions |
| @params: vm interface object or |
| vm interface filter following vspk filter structure |
| @return: vm interface policydecisions object |
| @Example: |
| self.vsd.get_vm_interface_policydecisions(vm_interface=interface) |
| self.vsd.get_vm_interface_policydecisions( |
| filter='externalID == "{}"'.format(vm_interface_externalID)) |
| """ |
| if not isinstance(vm_interface, self.vspk.NUVMInterface): |
| if not filter: |
| LOG.error('a filter is required') |
| return None |
| vm_interface = \ |
| self.session.user.vm_interfaces.get_first(filter=filter) |
| policy_decisions = self.vspk.NUPolicyDecision( |
| id=vm_interface.policy_decision_id).fetch() |
| if not policy_decisions: |
| if filter: |
| LOG.error('could not fetch the policy decisions on the ' |
| 'vm interface matching the filter "{}"' |
| .format(filter)) |
| else: |
| LOG.error('could not fetch the policy decisions ' |
| 'on the vm interface') |
| return policy_decisions |
| |
| @recreate_session_on_timeout |
| def get_vm_interface_dhcpoptions(self, vm_interface=None, filter=None): |
| """ get_vm_interface_dhcpoptions |
| @params: vm interface object or |
| vm interface filter following vspk filter structure |
| @return: vm interface dhcpoptions object |
| @Example: |
| self.vsd.get_vm_interface_dhcpoptions(vm_interface=vm_interface) |
| self.vsd.get_vm_interface_dhcpoptions( |
| filter='externalID == "{}"'.format(vm_interface_externalID)) |
| """ |
| if not isinstance(vm_interface, self.vspk.NUVMInterface): |
| if not filter: |
| LOG.error('a filter is required') |
| return None |
| vm_interface = self.session.user.vm_interfaces.get_first( |
| filter=filter) |
| dhcp_options = vm_interface.dhcp_options.get() |
| if not dhcp_options: |
| if filter: |
| LOG.error('could not fetch the dhcp options on the ' |
| 'vm interface matching the filter "{}"' |
| .format(filter)) |
| else: |
| LOG.error('could not fetch the dhcp options on the ' |
| 'vm interface') |
| return dhcp_options |
| |
| @recreate_session_on_timeout |
| def get_ingress_acl_entry(self, filter): |
| """ get_ingress_acl_entry |
| @params: ingress acl entry filter following vspk filter structure |
| @return: ingress acl entry object |
| @Example: |
| self.vsd.get_ingress_acl_entry( |
| filter='externalID == "{}"'.format(ext_id)) |
| """ |
| if not filter: |
| LOG.error('a filter is required') |
| return None |
| acl = self.session.user.ingress_acl_entry_templates.get_first( |
| filter=filter) |
| if not acl: |
| LOG.error('could not fetch the ingress acl entry ' |
| 'matching the filter "{}"' |
| .format(filter)) |
| return acl |
| |
| @recreate_session_on_timeout |
| def get_egress_acl_entry(self, filter): |
| """ get_egress_acl_entry |
| @params: egress acl entry filter following vspk filter structure |
| @return: egress acl entry object |
| @Example: |
| self.vsd.get_egress_acl_entry( |
| filter='externalID == "{}"'.format(ext_id)) |
| """ |
| if not filter: |
| LOG.error('a filter is required') |
| return None |
| acl = self.session.user.egress_acl_entry_templates.get_first( |
| filter=filter) |
| if not acl: |
| LOG.error('could not fetch the egress acl entry ' |
| 'matching the filter "{}"' |
| .format(filter)) |
| return acl |
| |
| @recreate_session_on_timeout |
| def get_qoss(self, vport): |
| """ get_qoss |
| @params: vport object |
| @return: qoss object |
| @Example: |
| self.vsd.get_qoss(vport=vport) |
| """ |
| if not isinstance(vport, self.vspk.NUVPort): |
| LOG.error('a vport is required') |
| return None |
| qoss = vport.qoss.get() |
| if not qoss: |
| LOG.error('could not fetch the qoss from the vport') |
| return qoss |
| |
| @recreate_session_on_timeout |
| def get_floating_ip(self, filter): |
| """ get_floating_ip |
| @params: floating ip filter following vspk filter structure |
| @return: floating ip object |
| @Example: |
| self.vsd.get_floating_ip( |
| filter='externalID == "{}"'.format(ext_id)) |
| """ |
| if not filter: |
| LOG.error('a filter is required') |
| return None |
| floating_ip = self.session.user.floating_ips.get_first(filter=filter) |
| if not floating_ip: |
| LOG.error('could not fetch the floating ip ' |
| 'matching the filter "{}"' |
| .format(filter)) |
| return floating_ip |
| |
| @recreate_session_on_timeout |
| def get_ingress_acl_entries(self, filter): |
| """ get_ingress_acl_entries |
| @params: ingress acl entries (templates) filter following vspk |
| filter structure |
| @return: ingress acl entries (objects) list |
| @Example: |
| self.vsd.get_ingress_acl_entries( |
| filter='externalID == "{}"'.format(ext_id)) |
| """ |
| if not filter: |
| LOG.error('a filter is required') |
| return None |
| templates = self.session.user.ingress_acl_templates.get(filter=filter) |
| if not templates: |
| LOG.error('could not fetch the ingress acl entries (templates) ' |
| 'matching the filter "{}"' |
| .format(filter)) |
| return None |
| acls = [] |
| for template in templates: |
| tmp = self.vspk.NUIngressACLTemplate(id=template.id) |
| acl = tmp.ingress_acl_entry_templates.get() |
| acls.append(acl) |
| return acls |
| |
| @recreate_session_on_timeout |
| def get_egress_acl_entries(self, filter): |
| """ get_egress_acl_entries |
| @params: egress acl entries (templates) filter |
| following vspk filter structure |
| @return: egress acl entries (objects) list |
| @Example: |
| self.vsd.get_egress_acl_entries( |
| filter='externalID == "{}"'.format(ext_id)) |
| """ |
| if not filter: |
| LOG.error('a filter is required') |
| return None |
| templates = self.session.user.egress_acl_templates.get(filter=filter) |
| if not templates: |
| LOG.error('could not fetch the egress acl entries (templates) ' |
| 'matching the filter "{}"' |
| .format(filter)) |
| return None |
| acls = [] |
| for template in templates: |
| tmp = self.vspk.NUEgressACLTemplate(id=template.id) |
| acl = tmp.egress_acl_entry_templates.get() |
| acls.append(acl) |
| return acls |
| |
| @recreate_session_on_timeout |
| def get_shared_network_resource(self, filter): |
| """ get_shared_network_resource |
| @params: shared network resource filter |
| following vspk filter structure |
| @return: shared network resource object |
| @Example: |
| self.vsd.get_shared_network_resource( |
| filter='externalID == "{}"'.format(ext_id)) |
| """ |
| if not filter: |
| LOG.error('a filter is required') |
| return None |
| shared_network_resource = \ |
| self.session.user.shared_network_resources.get_first(filter=filter) |
| if not shared_network_resource: |
| LOG.error('could not fetch the shared network resource ' |
| 'matching the filter "{}"' |
| .format(filter)) |
| return shared_network_resource |
| |
| @recreate_session_on_timeout |
| def get_virtualip(self, vport, filter): |
| """ get_virtualip |
| @params: vport object |
| virtualip filter following vspk filter structure |
| @return: virtualip object |
| @Example: |
| self.vsd.get_virtualip(vport=vport, |
| filter='externalID == "{}"'.format(ext_id)) |
| """ |
| if not isinstance(vport, self.vspk.NUVPort): |
| LOG.error('a vport is required') |
| return None |
| if not filter: |
| LOG.error('a filter is required') |
| return None |
| virtualip = vport.virtual_ips.get_first(filter=filter) |
| |
| if not virtualip: |
| LOG.error('could not fetch the virtualip matching the filter "{}"' |
| .format(filter)) |
| return virtualip |