| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| """ Tests for Multiple IPs per NIC feature |
| |
| Test Plan: https://cwiki.apache.org/confluence/display/CLOUDSTACK/Multiple+IPs+per+NIC+Test+Plan |
| |
| Issue Link: https://issues.apache.org/jira/browse/CLOUDSTACK-4840 |
| |
| Design Document: https://cwiki.apache.org/confluence/display/CLOUDSTACK/Multiple+IP+address+per+NIC |
| """ |
| from marvin.cloudstackTestCase import cloudstackTestCase |
| from marvin.integration.lib.utils import (cleanup_resources, |
| validateList, |
| random_gen) |
| from marvin.integration.lib.base import (Account, |
| ServiceOffering, |
| Network, |
| VirtualMachine, |
| VpcOffering, |
| VPC, |
| NIC, |
| Domain, |
| PublicIPAddress) |
| from marvin.integration.lib.common import (get_domain, |
| get_zone, |
| get_template, |
| get_free_vlan, |
| setSharedNetworkParams, |
| createEnabledNetworkOffering, |
| shouldTestBeSkipped) |
| |
| from nose.plugins.attrib import attr |
| from marvin.codes import PASS, ISOLATED_NETWORK, VPC_NETWORK, SHARED_NETWORK |
| from ddt import ddt, data |
| |
| def createNetwork(self, networkType): |
| """Create a network of given type (isolated/shared/isolated in VPC)""" |
| |
| network = None |
| |
| if networkType == ISOLATED_NETWORK: |
| try: |
| network = Network.create(self.apiclient,self.services["isolated_network"], |
| networkofferingid=self.isolated_network_offering.id, |
| accountid=self.account.name,domainid=self.account.domainid, |
| zoneid=self.zone.id) |
| except Exception as e: |
| self.fail("Isolated network creation failed because: %s" % e) |
| |
| elif networkType == SHARED_NETWORK: |
| physical_network, vlan = get_free_vlan(self.api_client, self.zone.id) |
| |
| #create network using the shared network offering created |
| self.services["shared_network"]["acltype"] = "domain" |
| self.services["shared_network"]["vlan"] = vlan |
| self.services["shared_network"]["networkofferingid"] = self.shared_network_offering.id |
| self.services["shared_network"]["physicalnetworkid"] = physical_network.id |
| |
| self.services["shared_network"] = setSharedNetworkParams(self.services["shared_network"]) |
| |
| try: |
| network = Network.create(self.api_client, self.services["shared_network"], |
| networkofferingid=self.shared_network_offering.id, zoneid=self.zone.id) |
| self.cleanup.append(network) |
| except Exception as e: |
| self.fail("Shared Network creation failed because: %s" % e) |
| |
| elif networkType == VPC_NETWORK: |
| self.services["vpc"]["cidr"] = "10.1.1.1/16" |
| self.debug("creating a VPC network in the account: %s" % |
| self.account.name) |
| vpc = VPC.create(self.apiclient, self.services["vpc"], |
| vpcofferingid=self.vpc_off.id, zoneid=self.zone.id, |
| account=self.account.name, domainid=self.account.domainid) |
| vpcs = VPC.list(self.apiclient, id=vpc.id) |
| self.assertEqual(validateList(vpcs)[0], PASS, "VPC list validation failed, vpc list is %s" % vpcs) |
| |
| network = Network.create(self.api_client,self.services["isolated_network"], |
| networkofferingid=self.isolated_network_offering_vpc.id, |
| accountid=self.account.name,domainid=self.account.domainid, |
| zoneid=self.zone.id, vpcid=vpc.id, gateway="10.1.1.1", netmask="255.255.255.0") |
| return network |
| |
| def CreateEnabledNetworkOffering(apiclient, networkServices): |
| """Create network offering of given services and enable it""" |
| |
| result = createEnabledNetworkOffering(apiclient, networkServices) |
| assert result[0] == PASS, "Network offering creation/enabling failed due to %s" % result[2] |
| return result[1] |
| |
| @ddt |
| class TestBasicOperations(cloudstackTestCase): |
| """Test Basic operations (add/remove/list) IP to/from NIC |
| """ |
| |
| @classmethod |
| def setUpClass(cls): |
| cloudstackTestClient = super(TestBasicOperations,cls).getClsTestClient() |
| cls.api_client = cloudstackTestClient.getApiClient() |
| |
| # Fill services from the external config file |
| cls.services = cloudstackTestClient.getConfigParser().parsedDict |
| |
| # Get Zone, Domain and templates |
| cls.domain = get_domain(cls.api_client, cls.services) |
| cls.zone = get_zone(cls.api_client, cls.services) |
| cls.mode = str(cls.zone.networktype).lower() |
| cls.template = get_template( |
| cls.api_client, |
| cls.zone.id, |
| cls.services["ostype"] |
| ) |
| cls.services["virtual_machine"]["zoneid"] = cls.zone.id |
| cls.services["virtual_machine"]["template"] = cls.template.id |
| cls.service_offering = ServiceOffering.create( |
| cls.api_client, |
| cls.services["service_offering"] |
| ) |
| cls._cleanup = [cls.service_offering] |
| cls.services["shared_network_offering"]["specifyVlan"] = "True" |
| cls.services["shared_network_offering"]["specifyIpRanges"] = "True" |
| |
| cls.shared_network_offering = CreateEnabledNetworkOffering(cls.api_client, |
| cls.services["shared_network_offering"]) |
| cls._cleanup.append(cls.shared_network_offering) |
| |
| if cls.mode == "advanced": |
| cls.isolated_network_offering = CreateEnabledNetworkOffering(cls.api_client, |
| cls.services["isolated_network_offering"]) |
| cls._cleanup.append(cls.isolated_network_offering) |
| cls.isolated_network_offering_vpc = CreateEnabledNetworkOffering(cls.api_client, |
| cls.services["nw_offering_isolated_vpc"]) |
| cls._cleanup.append(cls.isolated_network_offering_vpc) |
| cls.vpc_off = VpcOffering.create(cls.api_client, cls.services["vpc_offering"]) |
| cls.vpc_off.update(cls.api_client, state='Enabled') |
| cls._cleanup.append(cls.vpc_off) |
| return |
| |
| @classmethod |
| def tearDownClass(cls): |
| try: |
| # Cleanup resources used |
| cleanup_resources(cls.api_client, cls._cleanup) |
| except Exception as e: |
| raise Exception("Warning: Exception during cleanup : %s" % e) |
| return |
| |
| def setUp(self): |
| self.apiclient = self.testClient.getApiClient() |
| self.dbclient = self.testClient.getDbConnection() |
| self.cleanup = [ ] |
| return |
| |
| def tearDown(self): |
| try: |
| # Clean up, terminate the resources created |
| cleanup_resources(self.apiclient, self.cleanup) |
| self.cleanup[:] = [] |
| except Exception as e: |
| raise Exception("Warning: Exception during cleanup : %s" % e) |
| return |
| |
| def VerifyStaticNatForPublicIp(self, ipaddressid, natrulestatus): |
| """ List public IP and verify that NAT rule status for the IP is as desired """ |
| |
| publiciplist = PublicIPAddress.list(self.apiclient, id=ipaddressid, listall=True) |
| self.assertEqual(validateList(publiciplist)[0], PASS, "Public IP list validation failed") |
| self.assertEqual(publiciplist[0].isstaticnat, natrulestatus, "isstaticnat should be %s, it is %s" % |
| (natrulestatus, publiciplist[0].isstaticnat)) |
| |
| return |
| |
| @data(ISOLATED_NETWORK, SHARED_NETWORK, VPC_NETWORK) |
| @attr(tags=["basic","advanced"]) |
| def test_add_ip_to_nic(self, value): |
| """ Add secondary IP to NIC of a VM""" |
| |
| # Steps: |
| # 1. Create Account and create network in it (isoalted/ shared/ vpc) |
| # 2. Deploy a VM in this network and account |
| # 3. Add secondary IP to the default nic of VM |
| # 4. Try to add the same IP again |
| # 5. Try to add secondary IP providing wrong virtual machine id |
| # 6. Try to add secondary IP with correct virtual machine id but wrong IP address |
| |
| # Validations: |
| # 1. Step 3 should succeed |
| # 2. Step 4 should fail |
| # 3. Step 5 should should fail |
| # 4. Step 6 should fail |
| |
| self.account = Account.create(self.apiclient,self.services["account"],domainid=self.domain.id) |
| self.cleanup.append(self.account) |
| |
| if(shouldTestBeSkipped(networkType=value, zoneType=self.mode)): |
| self.skipTest("Skipping test as %s network is not supported in basic zone" % value) |
| |
| network = createNetwork(self, value) |
| |
| try: |
| virtual_machine = VirtualMachine.create(self.apiclient,self.services["virtual_machine"], |
| networkids=[network.id],serviceofferingid=self.service_offering.id, |
| accountid=self.account.name,domainid=self.account.domainid) |
| except Exception as e: |
| self.fail("vm creation failed: %s" % e) |
| |
| try: |
| ipaddress_1 = NIC.addIp(self.apiclient, id=virtual_machine.nic[0].id) |
| except Exception as e: |
| self.fail("Failed while adding secondary IP to NIC of vm %s" % virtual_machine.id) |
| |
| try: |
| NIC.addIp(self.apiclient, id=virtual_machine.nic[0].id, ipaddress=ipaddress_1.ipaddress) |
| self.debug("Adding already added secondary IP %s to NIC of vm %s succeeded, should have failed" % |
| (ipaddress_1.ipaddress, virtual_machine.id)) |
| except Exception as e: |
| self.debug("Failed while adding already added secondary IP to NIC of vm %s" % virtual_machine.id) |
| |
| try: |
| NIC.addIp(self.apiclient, id=(virtual_machine.nic[0].id + random_gen())) |
| self.fail("Adding secondary IP with wrong NIC id succeded, it shoud have failed") |
| except Exception as e: |
| self.debug("Failed while adding secondary IP to wrong NIC") |
| |
| try: |
| NIC.addIp(self.apiclient, id=virtual_machine.nic[0].id, ipaddress = "255.255.255.300") |
| self.fail("Adding secondary IP with wrong ipaddress succeded, it should have failed") |
| except Exception as e: |
| self.debug("Failed while adding wrong secondary IP to NIC of VM %s" % virtual_machine.id) |
| return |
| |
| @data(ISOLATED_NETWORK, SHARED_NETWORK, VPC_NETWORK) |
| @attr(tags=["basic","advanced"]) |
| def test_remove_ip_from_nic(self, value): |
| """ Remove secondary IP from NIC of a VM""" |
| |
| # Steps: |
| # 1. Create Account and create network in it (isoalted/ shared/ vpc) |
| # 2. Deploy a VM in this network and account |
| # 3. Add secondary IP to the default nic of VM |
| # 4. Remove the secondary IP |
| # 5. Try to remove secondary ip by giving incorrect ipaddress id |
| |
| # Validations: |
| # 1. Step 4 should succeed |
| # 2. Step 5 should fail |
| |
| self.account = Account.create(self.apiclient,self.services["account"],domainid=self.domain.id) |
| self.cleanup.append(self.account) |
| |
| if(shouldTestBeSkipped(networkType=value, zoneType=self.mode)): |
| self.skipTest("Skipping test as %s network is not supported in basic zone" % value) |
| |
| network = createNetwork(self, value) |
| |
| try: |
| virtual_machine = VirtualMachine.create(self.apiclient,self.services["virtual_machine"], |
| networkids=[network.id],serviceofferingid=self.service_offering.id, |
| accountid=self.account.name,domainid=self.account.domainid) |
| except Exception as e: |
| self.fail("vm creation failed: %s" % e) |
| |
| try: |
| ipaddress_1 = NIC.addIp(self.apiclient, id=virtual_machine.nic[0].id) |
| except Exception as e: |
| self.fail("Failed while adding secondary IP to NIC of vm %s" % virtual_machine.id) |
| |
| try: |
| NIC.removeIp(self.apiclient, ipaddressid=ipaddress_1.id) |
| except Exception as e: |
| self.fail("Removing seondary IP %s from NIC failed as expected with Exception %s" % (ipaddress_1.id,e)) |
| |
| try: |
| NIC.removeIp(self.apiclient, ipaddressid=(ipaddress_1.id + random_gen())) |
| self.fail("Removing invalid IP address, it should have failed") |
| except Exception as e: |
| self.debug("Removing invalid IP failed as expected with Exception %s" % e) |
| return |
| |
| @attr(tags=["basic","advanced"]) |
| def test_remove_invalid_ip(self): |
| """ Remove invalid ip""" |
| |
| # Steps: |
| # 1. Try to remove secondary ip without passing ip address id |
| |
| # Validations: |
| # 1. Step 1 should fail |
| |
| try: |
| NIC.removeIp(self.apiclient, ipaddressid="") |
| self.fail("Removing IP address without passing IP succeeded, it should have failed") |
| except Exception as e: |
| self.debug("Removing IP from NIC without passing ipaddressid failed as expected with Exception %s" % e) |
| return |
| |
| @data(ISOLATED_NETWORK, SHARED_NETWORK, VPC_NETWORK) |
| @attr(tags=["basic","advanced"]) |
| def test_list_nics(self, value): |
| """Test listing nics associated with the ip address""" |
| |
| # Steps: |
| # 1. Create Account and create network in it (isoalted/ shared/ vpc) |
| # 2. Deploy a VM in this network and account |
| # 3. Add secondary IP to the default nic of VM |
| # 4. Try to list the secondary ips without passing vm id |
| # 5. Try to list secondary IPs by passing correct vm id |
| # 6. Try to list secondary IPs by passing correct vm id and its nic id |
| # 7. Try to list secondary IPs by passing incorrect vm id and correct nic id |
| # 8. Try to list secondary IPs by passing correct vm id and incorrect nic id |
| # 9. Try to list secondary IPs by passing incorrect vm id and incorrect nic id |
| |
| # Validations: |
| # 1. Step 4 should fail |
| # 2. Step 5 should succeed |
| # 3. Step 6 should succeed |
| # 4. Step 7 should fail |
| # 5. Step 8 should fail |
| # 6. Step 9 should fail |
| |
| |
| self.account = Account.create(self.apiclient,self.services["account"],domainid=self.domain.id) |
| self.cleanup.append(self.account) |
| |
| if(shouldTestBeSkipped(networkType=value, zoneType=self.mode)): |
| self.skipTest("Skipping test as %s network is not supported in basic zone" % value) |
| |
| network = createNetwork(self, value) |
| |
| try: |
| virtual_machine = VirtualMachine.create(self.apiclient,self.services["virtual_machine"], |
| networkids=[network.id],serviceofferingid=self.service_offering.id, |
| accountid=self.account.name,domainid=self.account.domainid) |
| except Exception as e: |
| self.fail("vm creation failed: %s" % e) |
| |
| try: |
| NIC.addIp(self.apiclient, id=virtual_machine.nic[0].id) |
| except Exception as e: |
| self.fail("Failed while adding secondary IP to NIC of vm %s" % virtual_machine.id) |
| |
| try: |
| nics = NIC.list(self.apiclient) |
| self.fail("Listing NICs without passign VM id succeeded, it should have failed, list is %s" % nics) |
| except Exception as e: |
| self.debug("Listing NICs without passing virtual machine id failed as expected") |
| |
| try: |
| NIC.list(self.apiclient, virtualmachineid=virtual_machine.id) |
| except Exception as e: |
| self.fail("Listing NICs for virtual machine %s failed with Exception %s" % (virtual_machine.id, e)) |
| |
| try: |
| NIC.list(self.apiclient, virtualmachineid=virtual_machine.id, nicid=virtual_machine.nic[0].id) |
| except Exception as e: |
| self.fail("Listing NICs for virtual machine %s and nic id %s failed with Exception %s" % |
| (virtual_machine.id, virtual_machine.nic[0].id, e)) |
| |
| try: |
| nics = NIC.list(self.apiclient, virtualmachineid=(virtual_machine.id + random_gen()), nicid=virtual_machine.nic[0].id) |
| self.fail("Listing NICs with wrong virtual machine id and right nic id succeeded, should have failed") |
| except Exception as e: |
| self.debug("Listing NICs with wrong virtual machine id and right nic failed as expected with Exception %s" % e) |
| |
| try: |
| nics = NIC.list(self.apiclient, virtualmachineid=virtual_machine.id, nicid=(virtual_machine.nic[0].id + random_gen())) |
| self.fail("Listing NICs with correct virtual machine id but wrong nic id succeeded, should have failed") |
| except Exception as e: |
| self.debug("Listing NICs with correct virtual machine id but wrong nic id failed as expected with Exception %s" % e) |
| |
| try: |
| nics = NIC.list(self.apiclient, virtualmachineid=(virtual_machine.id+random_gen()), nicid=(virtual_machine.nic[0].id + random_gen())) |
| self.fail("Listing NICs with wrong virtual machine id and wrong nic id succeeded, should have failed") |
| except Exception as e: |
| self.debug("Listing NICs with wrong virtual machine id and wrong nic id failed as expected with Exception %s" % e) |
| |
| return |
| |
| @data(ISOLATED_NETWORK, SHARED_NETWORK, VPC_NETWORK) |
| @attr(tags=["basic","advanced"]) |
| def test_operations_non_root_admin_api_client(self, value): |
| """Test basic operations using non root admin apii client""" |
| |
| # Steps: |
| # 1. Create Domain and Account in it |
| # 2. Create network in it (isoalted/ shared/ vpc) |
| # 3. Create User API client of this account |
| # 4. Deploy a VM in this network and account |
| # 5. Add secondary IP to the default nic of VM using non root admin api client |
| # 6. List secondary IPs using non root admin api client |
| # 7. Remove secondary IP using non root admin api client |
| |
| # Validations: |
| # 1. All the operations should be successful |
| |
| child_domain = Domain.create(self.apiclient,services=self.services["domain"], |
| parentdomainid=self.domain.id) |
| |
| self.account = Account.create(self.apiclient,self.services["account"],domainid=child_domain.id) |
| self.cleanup.append(self.account) |
| self.cleanup.append(child_domain) |
| |
| apiclient = self.testClient.createUserApiClient(UserName=self.account.name, DomainName=self.account.domain) |
| |
| if(shouldTestBeSkipped(networkType=value, zoneType=self.mode)): |
| self.skipTest("Skipping test as %s network is not supported in basic zone" % value) |
| |
| network = createNetwork(self, value) |
| |
| try: |
| virtual_machine = VirtualMachine.create(self.apiclient,self.services["virtual_machine"], |
| networkids=[network.id],serviceofferingid=self.service_offering.id, |
| accountid=self.account.name,domainid=self.account.domainid) |
| except Exception as e: |
| self.fail("vm creation failed: %s" % e) |
| |
| try: |
| ipaddress_1 = NIC.addIp(apiclient, id=virtual_machine.nic[0].id) |
| except Exception as e: |
| self.fail("Failed while adding secondary IP to NIC of vm %s" % virtual_machine.id) |
| |
| try: |
| NIC.list(apiclient, virtualmachineid=virtual_machine.id) |
| except Exception as e: |
| self.fail("Listing NICs for virtual machine %s failed with Exception %s" % (virtual_machine.id, e)) |
| |
| try: |
| NIC.list(apiclient, virtualmachineid=virtual_machine.id, nicid=virtual_machine.nic[0].id) |
| except Exception as e: |
| self.fail("Listing NICs for virtual machine %s and nic id %s failed with Exception %s" % |
| (virtual_machine.id, virtual_machine.nic[0].id, e)) |
| |
| try: |
| NIC.removeIp(apiclient, ipaddressid=ipaddress_1.id) |
| except Exception as e: |
| self.fail("Removing seondary IP %s from NIC failed as expected with Exception %s" % (ipaddress_1.id,e)) |
| |
| return |