| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| """ |
| Tests of network permissions |
| """ |
| |
| import logging |
| |
| from nose.plugins.attrib import attr |
| from marvin.cloudstackTestCase import cloudstackTestCase |
| |
| from marvin.lib.base import (Account, |
| Configurations, |
| Domain, |
| Project, |
| ServiceOffering, |
| VirtualMachine, |
| Zone, |
| Network, |
| NetworkOffering, |
| NetworkPermission, |
| NIC, |
| PublicIPAddress, |
| LoadBalancerRule, |
| NATRule, |
| StaticNATRule, |
| |
| SSHKeyPair) |
| |
| from marvin.lib.common import (get_domain, |
| get_zone, |
| get_suitable_test_template) |
| |
| NETWORK_FILTER_ACCOUNT = 'account' |
| NETWORK_FILTER_DOMAIN = 'domain' |
| NETWORK_FILTER_ACCOUNT_DOMAIN = 'accountdomain' |
| NETWORK_FILTER_SHARED = 'shared' |
| NETWORK_FILTER_ALL = 'all' |
| |
| class TestNetworkPermissions(cloudstackTestCase): |
| """ |
| Test user-shared networks |
| """ |
| @classmethod |
| def setUpClass(cls): |
| cls.testClient = super( |
| TestNetworkPermissions, |
| cls).getClsTestClient() |
| cls.apiclient = cls.testClient.getApiClient() |
| cls.services = cls.testClient.getParsedTestDataConfig() |
| |
| zone = get_zone(cls.apiclient, cls.testClient.getZoneForTests()) |
| cls.zone = Zone(zone.__dict__) |
| cls.hypervisor = cls.testClient.getHypervisorInfo() |
| cls.template = get_suitable_test_template( |
| cls.apiclient, |
| cls.zone.id, |
| cls.services["ostype"], |
| cls.hypervisor) |
| cls._cleanup = [] |
| |
| cls.logger = logging.getLogger("TestNetworkPermissions") |
| cls.stream_handler = logging.StreamHandler() |
| cls.logger.setLevel(logging.DEBUG) |
| cls.logger.addHandler(cls.stream_handler) |
| |
| cls.domain = get_domain(cls.apiclient) |
| |
| # Create small service offering |
| cls.service_offering = ServiceOffering.create( |
| cls.apiclient, |
| cls.services["service_offerings"]["small"] |
| ) |
| cls._cleanup.append(cls.service_offering) |
| |
| # Create network offering for isolated networks |
| cls.network_offering_isolated = NetworkOffering.create( |
| cls.apiclient, |
| cls.services["isolated_network_offering"] |
| ) |
| cls.network_offering_isolated.update(cls.apiclient, state='Enabled') |
| cls._cleanup.append(cls.network_offering_isolated) |
| |
| # Create sub-domain |
| cls.sub_domain = Domain.create( |
| cls.apiclient, |
| cls.services["acl"]["domain1"] |
| ) |
| cls._cleanup.append(cls.sub_domain) |
| |
| # Create domain admin and normal user |
| cls.domain_admin = Account.create( |
| cls.apiclient, |
| cls.services["acl"]["accountD1A"], |
| admin=True, |
| domainid=cls.sub_domain.id |
| ) |
| cls._cleanup.append(cls.domain_admin) |
| |
| cls.network_owner = Account.create( |
| cls.apiclient, |
| cls.services["acl"]["accountD11A"], |
| domainid=cls.sub_domain.id |
| ) |
| cls._cleanup.append(cls.network_owner) |
| |
| cls.other_user = Account.create( |
| cls.apiclient, |
| cls.services["acl"]["accountD11B"], |
| domainid=cls.sub_domain.id |
| ) |
| cls._cleanup.append(cls.other_user) |
| |
| # Create project |
| cls.project = Project.create( |
| cls.apiclient, |
| cls.services["project"], |
| account=cls.domain_admin.name, |
| domainid=cls.domain_admin.domainid |
| ) |
| cls._cleanup.append(cls.project) |
| |
| # Create api clients for domain admin and normal user |
| cls.domainadmin_user = cls.domain_admin.user[0] |
| cls.domainadmin_apiclient = cls.testClient.getUserApiClient( |
| cls.domainadmin_user.username, cls.sub_domain.name |
| ) |
| cls.networkowner_user = cls.network_owner.user[0] |
| cls.user_apiclient = cls.testClient.getUserApiClient( |
| cls.networkowner_user.username, cls.sub_domain.name |
| ) |
| |
| cls.otheruser_user = cls.other_user.user[0] |
| cls.otheruser_apiclient = cls.testClient.getUserApiClient( |
| cls.otheruser_user.username, cls.sub_domain.name |
| ) |
| |
| # Create networks for domain admin, normal user and project |
| cls.services["network"]["name"] = "Test Network Isolated - Project" |
| cls.project_network = Network.create( |
| cls.apiclient, |
| cls.services["network"], |
| networkofferingid=cls.network_offering_isolated.id, |
| domainid=cls.sub_domain.id, |
| projectid=cls.project.id, |
| zoneid=cls.zone.id |
| ) |
| cls._cleanup.append(cls.project_network) |
| |
| cls.services["network"]["name"] = "Test Network Isolated - Domain admin" |
| cls.domainadmin_network = Network.create( |
| cls.apiclient, |
| cls.services["network"], |
| networkofferingid=cls.network_offering_isolated.id, |
| domainid=cls.sub_domain.id, |
| accountid=cls.domain_admin.name, |
| zoneid=cls.zone.id |
| ) |
| cls._cleanup.append(cls.domainadmin_network) |
| |
| cls.services["network"]["name"] = "Test Network Isolated - Normal user" |
| cls.user_network = Network.create( |
| cls.apiclient, |
| cls.services["network"], |
| networkofferingid=cls.network_offering_isolated.id, |
| domainid=cls.sub_domain.id, |
| accountid=cls.network_owner.name, |
| zoneid=cls.zone.id |
| ) |
| cls._cleanup.append(cls.user_network) |
| |
| @classmethod |
| def tearDownClass(cls): |
| super(TestNetworkPermissions, cls).tearDownClass() |
| |
| def setUp(self): |
| self.cleanup = [] |
| self.virtual_machine = None |
| |
| def tearDown(self): |
| super(TestNetworkPermissions, self).tearDown() |
| |
| def list_network(self, apiclient, account, network, project, network_filter=None, expected=True): |
| # List networks by apiclient, account, network, project and network network_filter |
| # If account is specified, list the networks which can be used by the domain (canusefordeploy=true,listall=false) |
| # otherwise canusefordeploy is None and listall is True. |
| domain_id = None |
| account_name = None |
| project_id = None |
| canusefordeploy = None |
| list_all = True |
| if account: |
| domain_id = account.domainid |
| account_name = account.name |
| canusefordeploy = True |
| list_all = False |
| if project: |
| project_id = project.id |
| networks = None |
| try: |
| networks = Network.list( |
| apiclient, |
| canusefordeploy=canusefordeploy, |
| listall=list_all, |
| networkfilter= network_filter, |
| domainid=domain_id, |
| account=account_name, |
| projectid=project_id, |
| id=network.id |
| ) |
| if isinstance(networks, list) and len(networks) > 0: |
| if not expected: |
| self.fail("Found the network, but expected to fail") |
| elif expected: |
| self.fail("Failed to find the network, but expected to succeed") |
| except Exception as ex: |
| networks = None |
| if expected: |
| self.fail(f"Failed to list network, but expected to succeed : {ex}") |
| if networks and not expected: |
| self.fail("network is listed successfully, but expected to fail") |
| |
| def list_network_by_filters(self, apiclient, account, network, project, expected_results=None): |
| # expected results in order: account/domain/accountdomain/shared/all |
| self.list_network(apiclient, account, network, project, NETWORK_FILTER_ACCOUNT, expected_results[0]) |
| self.list_network(apiclient, account, network, project, NETWORK_FILTER_DOMAIN, expected_results[1]) |
| self.list_network(apiclient, account, network, project, NETWORK_FILTER_ACCOUNT_DOMAIN, expected_results[2]) |
| self.list_network(apiclient, account, network, project, NETWORK_FILTER_SHARED, expected_results[3]) |
| self.list_network(apiclient, account, network, project, NETWORK_FILTER_ALL, expected_results[4]) |
| |
| def create_network_permission(self, apiclient, network, account, project, expected=True): |
| account_id = None |
| project_id = None |
| if account: |
| account_id = account.id |
| if project: |
| project_id = project.id |
| result = True |
| try: |
| NetworkPermission.create( |
| apiclient, |
| networkid=network.id, |
| accountids=account_id, |
| projectids=project_id |
| ) |
| except Exception as ex: |
| result = False |
| if expected: |
| self.fail(f"Failed to create network permissions, but expected to succeed : {ex}") |
| if result and not expected: |
| self.fail("network permission is created successfully, but expected to fail") |
| |
| def remove_network_permission(self, apiclient, network, account, project, expected=True): |
| account_id = None |
| project_id = None |
| if account: |
| account_id = account.id |
| if project: |
| project_id = project.id |
| result = True |
| try: |
| NetworkPermission.remove( |
| apiclient, |
| networkid=network.id, |
| accountids=account_id, |
| projectids=project_id |
| ) |
| except Exception as ex: |
| result = False |
| if expected: |
| self.fail(f"Failed to remove network permissions, but expected to succeed : {ex}") |
| if result and not expected: |
| self.fail("network permission is removed successfully, but expected to fail") |
| |
| def reset_network_permission(self, apiclient, network, expected=True): |
| result = True |
| try: |
| NetworkPermission.reset( |
| apiclient, |
| networkid=network.id |
| ) |
| except Exception as ex: |
| result = False |
| if expected: |
| self.fail(f"Failed to reset network permissions, but expected to succeed : {ex}") |
| if result and not expected: |
| self.fail("network permission is reset successfully, but expected to fail") |
| |
| def exec_command(self, apiclient_str, command, expected=None): |
| result = True |
| try: |
| command = command.format(apiclient = apiclient_str) |
| exec(command) |
| except Exception as ex: |
| result = False |
| if expected: |
| self.fail(f"Failed to execute command '{command}' with exception : {ex}") |
| if result and expected is False: |
| self.fail(f"command {command} is executed successfully, but expected to fail") |
| if expected is None: |
| # if expected is None, display the command and result |
| self.logger.info(f"Result of command '{command}' : {result}") |
| return result |
| |
| @attr(tags=["advanced"], required_hardware="false") |
| def test_01_network_permission_on_project_network(self): |
| """ Testing network permissions on project network """ |
| |
| self.create_network_permission(self.apiclient, self.project_network, self.domain_admin, None, expected=False) |
| self.create_network_permission(self.domainadmin_apiclient, self.project_network, self.domain_admin, None, expected=False) |
| self.create_network_permission(self.user_apiclient, self.project_network, self.network_owner, None, expected=False) |
| |
| @attr(tags=["advanced"], required_hardware="false") |
| def test_02_network_permission_on_user_network(self): |
| """ Testing network permissions on user network """ |
| |
| # List user network by domain admin |
| self.list_network_by_filters(self.domainadmin_apiclient, None, self.user_network, None, [True, False, True, False, True]) |
| self.list_network_by_filters(self.domainadmin_apiclient, self.domain_admin, self.user_network, None, [False, False, False, False, False]) |
| |
| # Create network permissions |
| self.create_network_permission(self.apiclient, self.user_network, self.domain_admin, None, expected=True) |
| self.create_network_permission(self.domainadmin_apiclient, self.user_network, self.domain_admin, None, expected=True) |
| self.create_network_permission(self.user_apiclient, self.user_network, self.network_owner, None, expected=True) |
| self.create_network_permission(self.user_apiclient, self.user_network, self.other_user, None, expected=True) |
| self.create_network_permission(self.user_apiclient, self.user_network, None, self.project, expected=False) |
| self.create_network_permission(self.domainadmin_apiclient, self.user_network, None, self.project, expected=True) |
| self.create_network_permission(self.otheruser_apiclient, self.user_network, self.network_owner, None, expected=False) |
| |
| # List domain admin network by domain admin |
| self.list_network_by_filters(self.domainadmin_apiclient, None, self.domainadmin_network, None, [True, False, True, False, True]) |
| self.list_network_by_filters(self.domainadmin_apiclient, self.domain_admin, self.domainadmin_network, None, [True, False, True, False, True]) |
| # List user network by domain admin |
| self.list_network_by_filters(self.domainadmin_apiclient, None, self.user_network, None, [True, False, True, True, True]) |
| self.list_network_by_filters(self.domainadmin_apiclient, self.domain_admin, self.user_network, None, [False, False, False, True, True]) |
| # List user network by user |
| self.list_network_by_filters(self.user_apiclient, None, self.user_network, None, [True, False, True, False, True]) |
| self.list_network_by_filters(self.user_apiclient, self.network_owner, self.user_network, None, [True, False, True, False, True]) |
| # List user network by other user |
| self.list_network_by_filters(self.otheruser_apiclient, None, self.user_network, None, [False, False, False, True, True]) |
| self.list_network_by_filters(self.otheruser_apiclient, self.network_owner, self.user_network, None, [False, False, False, False, False]) |
| |
| # Remove network permissions |
| self.remove_network_permission(self.domainadmin_apiclient, self.user_network, self.domain_admin, None, expected=True) |
| # List user network by domain admin |
| self.list_network_by_filters(self.domainadmin_apiclient, None, self.user_network, None, [True, False, True, True, True]) |
| self.list_network_by_filters(self.domainadmin_apiclient, self.domain_admin, self.user_network, None, [False, False, False, False, False]) |
| |
| # Reset network permissions |
| self.reset_network_permission(self.domainadmin_apiclient, self.user_network, expected=True) |
| # List user network by domain admin |
| self.list_network_by_filters(self.domainadmin_apiclient, None, self.user_network, None, [True, False, True, False, True]) |
| self.list_network_by_filters(self.domainadmin_apiclient, self.domain_admin, self.user_network, None, [False, False, False, False, False]) |
| |
| @attr(tags=["advanced"], required_hardware="false") |
| def test_03_network_operations_on_created_vm_of_otheruser(self): |
| """ Testing network operations on a create vm owned by other user""" |
| |
| # 1. Create an Isolated network by other user |
| self.services["network"]["name"] = "Test Network Isolated - Other user" |
| otheruser_network = Network.create( |
| self.otheruser_apiclient, |
| self.services["network"], |
| networkofferingid=self.network_offering_isolated.id, |
| zoneid=self.zone.id |
| ) |
| self.cleanup.append(otheruser_network) |
| |
| # 2. Deploy vm1 on other user's network |
| self.virtual_machine = VirtualMachine.create( |
| self.otheruser_apiclient, |
| self.services["virtual_machine"], |
| templateid=self.template.id, |
| serviceofferingid=self.service_offering.id, |
| networkids=otheruser_network.id, |
| zoneid=self.zone.id |
| ) |
| |
| # 3. Add user network to vm1, should fail by vm owner and network owner |
| command = """self.virtual_machine.add_nic({apiclient}, self.user_network.id)""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=False) |
| |
| # 4. Create network permission for other user, should succeed by network owner |
| command = """self.create_network_permission({apiclient}, self.user_network, self.other_user, None, expected=True)""" |
| self.exec_command("self.otheruser_apiclient", command, expected=False) |
| self.exec_command("self.user_apiclient", command, expected=True) |
| |
| # 5. Add user network to vm1, should succeed by vm owner |
| command = """self.virtual_machine.add_nic({apiclient}, self.user_network.id)""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| # 6. Stop vm1 with forced=true, should succeed by vm owner |
| command = """self.virtual_machine.stop({apiclient}, forced=True)""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| # Get id of the additional nic |
| list_vms = VirtualMachine.list( |
| self.otheruser_apiclient, |
| id = self.virtual_machine.id |
| ) |
| self.assertEqual( |
| isinstance(list_vms, list), |
| True, |
| "Check if virtual machine is present" |
| ) |
| self.assertEqual( |
| len(list_vms) > 0, |
| True, |
| "Check if virtual machine list is empty" |
| ) |
| self.vm_default_nic_id = None |
| self.vm_new_nic_id = None |
| for vm_nic in list_vms[0].nic: |
| if vm_nic.networkid == self.user_network.id: |
| self.vm_new_nic_id = vm_nic.id |
| else: |
| self.vm_default_nic_id = vm_nic.id |
| |
| # 6. Update vm1 nic IP, should succeed by vm owner |
| command = """NIC.updateIp({apiclient}, self.vm_new_nic_id)""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| # 7. Start vm1, should succeed by vm owner |
| command = """self.virtual_machine.start({apiclient})""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| # 8. Add secondary IP to nic, should succeed by vm owner |
| command = """self.secondaryip = NIC.addIp({apiclient}, self.vm_new_nic_id)""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| # 9 Remove secondary IP from nic, should succeed by vm owner |
| command = """NIC.removeIp({apiclient}, self.secondaryip.id)""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| # 10. Update default NIC, should succeed by vm owner |
| command = """self.virtual_machine.update_default_nic({apiclient}, self.vm_new_nic_id)""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| command = """self.virtual_machine.update_default_nic({apiclient}, self.vm_default_nic_id)""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| # 11. Stop vm1 with forced=true |
| command = """self.virtual_machine.stop({apiclient}, forced=True)""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| # 12. Remove nic from vm1, should succeed by vm owner |
| command = """self.virtual_machine.remove_nic({apiclient}, self.vm_new_nic_id)""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| # 13. Test operations by domain admin |
| command = """self.virtual_machine.add_nic({apiclient}, self.user_network.id)""" |
| self.exec_command("self.domainadmin_apiclient", command, expected=True) |
| |
| list_vms = VirtualMachine.list( |
| self.otheruser_apiclient, |
| id = self.virtual_machine.id |
| ) |
| |
| self.vm_default_nic_id = None |
| self.vm_new_nic_id = None |
| for vm_nic in list_vms[0].nic: |
| if vm_nic.networkid == self.user_network.id: |
| self.vm_new_nic_id = vm_nic.id |
| else: |
| self.vm_default_nic_id = vm_nic.id |
| |
| command = """NIC.updateIp({apiclient}, self.vm_new_nic_id)""" |
| self.exec_command("self.domainadmin_apiclient", command, expected=True) |
| |
| command = """self.secondaryip = NIC.addIp({apiclient}, self.vm_new_nic_id)""" |
| self.exec_command("self.domainadmin_apiclient", command, expected=True) |
| |
| command = """NIC.removeIp({apiclient}, self.secondaryip.id)""" |
| self.exec_command("self.domainadmin_apiclient", command, expected=True) |
| |
| command = """self.virtual_machine.update_default_nic({apiclient}, self.vm_new_nic_id)""" |
| self.exec_command("self.domainadmin_apiclient", command, expected=True) |
| |
| command = """self.virtual_machine.update_default_nic({apiclient}, self.vm_default_nic_id)""" |
| self.exec_command("self.domainadmin_apiclient", command, expected=True) |
| |
| command = """self.virtual_machine.remove_nic({apiclient}, self.vm_new_nic_id)""" |
| self.exec_command("self.domainadmin_apiclient", command, expected=True) |
| |
| # 14. Test operations by vm owner, when network permission is removed |
| command = """self.virtual_machine.add_nic({apiclient}, self.user_network.id)""" |
| self.exec_command("self.domainadmin_apiclient", command, expected=True) |
| |
| # 15. Reset network permissions, should succeed by network owner |
| command = """self.reset_network_permission({apiclient}, self.user_network, expected=True)""" |
| self.exec_command("self.otheruser_apiclient", command, expected=False) |
| self.exec_command("self.user_apiclient", command, expected=True) |
| |
| list_vms = VirtualMachine.list( |
| self.otheruser_apiclient, |
| id = self.virtual_machine.id |
| ) |
| |
| self.vm_default_nic_id = None |
| self.vm_new_nic_id = None |
| for vm_nic in list_vms[0].nic: |
| if vm_nic.networkid == self.user_network.id: |
| self.vm_new_nic_id = vm_nic.id |
| else: |
| self.vm_default_nic_id = vm_nic.id |
| |
| command = """NIC.updateIp({apiclient}, self.vm_new_nic_id)""" |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| command = """self.secondaryip = NIC.addIp({apiclient}, self.vm_new_nic_id)""" |
| if self.exec_command("self.otheruser_apiclient", command, expected=True): |
| command = """NIC.removeIp({apiclient}, self.secondaryip.id)""" |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| command = """self.virtual_machine.update_default_nic({apiclient}, self.vm_new_nic_id)""" |
| if self.exec_command("self.otheruser_apiclient", command, expected=True): |
| command = """self.virtual_machine.update_default_nic({apiclient}, self.vm_default_nic_id)""" |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| command = """self.virtual_machine.start({apiclient})""" |
| if self.exec_command("self.otheruser_apiclient", command, expected=True): |
| command = """self.virtual_machine.stop({apiclient}, forced=True)""" |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| command = """self.virtual_machine.remove_nic({apiclient}, self.vm_new_nic_id)""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| #self.exec_command("self.domainadmin_apiclient", command, expected=True) |
| |
| # 16. Destroy vm1, should succeed by root admin |
| self.virtual_machine.delete(self.apiclient, expunge=True) |
| |
| @attr(tags=["advanced"], required_hardware="false") |
| def test_04_deploy_vm_for_other_user_and_test_vm_operations(self): |
| """ Deploy VM for other user and test VM operations by vm owner, network owner and domain admin""" |
| |
| # 1. Create network permission for other user, by user |
| command = """self.create_network_permission({apiclient}, self.user_network, self.other_user, None, expected=True)""" |
| self.exec_command("self.otheruser_apiclient", command, expected=False) |
| self.exec_command("self.user_apiclient", command, expected=True) |
| |
| # 2. Deploy vm2 on user network |
| command = """self.virtual_machine = VirtualMachine.create( |
| {apiclient}, |
| self.services["virtual_machine"], |
| templateid=self.template.id, |
| serviceofferingid=self.service_offering.id, |
| networkids=self.user_network.id, |
| accountid=self.other_user.name, |
| domainid=self.other_user.domainid, |
| zoneid=self.zone.id |
| )""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| if not self.virtual_machine: |
| self.fail("Failed to find self.virtual_machine") |
| |
| # 3. List vm2 |
| list_vms = VirtualMachine.list( |
| self.user_apiclient, |
| id = self.virtual_machine.id |
| ) |
| self.assertEqual( |
| isinstance(list_vms, list) and len(list_vms) > 0, |
| False, |
| "Check if virtual machine is not present" |
| ) |
| list_vms = VirtualMachine.list( |
| self.otheruser_apiclient, |
| id = self.virtual_machine.id |
| ) |
| self.assertEqual( |
| isinstance(list_vms, list) and len(list_vms) > 0, |
| True, |
| "Check if virtual machine is present" |
| ) |
| |
| # 4. Stop vm2 with forced=true |
| command = """self.virtual_machine.stop({apiclient}, forced=True)""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| # 5. Reset vm password |
| if self.template.passwordenabled: |
| command = """self.virtual_machine.resetPassword({apiclient})""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| # 6. Reset vm SSH key |
| self.keypair = SSHKeyPair.create( |
| self.otheruser_apiclient, |
| name=self.other_user.name + ".pem" |
| ) |
| command = """self.virtual_machine.resetSshKey({apiclient}, keypair=self.keypair.name)""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| # 7. Start vm2 |
| command = """self.virtual_machine.start({apiclient})""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| # 8. Acquire public IP, should succeed by domain admin and network owner |
| command = """self.public_ip = PublicIPAddress.create( |
| {apiclient}, |
| zoneid=self.zone.id, |
| networkid=self.user_network.id |
| )""" |
| self.exec_command("self.otheruser_apiclient", command, expected=False) |
| self.exec_command("self.user_apiclient", command, expected=True) |
| #self.exec_command("self.domainadmin_apiclient", command, expected=True) |
| |
| # 9. Enable static nat, should succeed by domain admin |
| command = """StaticNATRule.enable( |
| {apiclient}, |
| ipaddressid=self.public_ip.ipaddress.id, |
| virtualmachineid=self.virtual_machine.id |
| )""" |
| self.exec_command("self.otheruser_apiclient", command, expected=False) |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.domainadmin_apiclient", command, expected=True) |
| |
| # 10. Disable static nat, should succeed by domain admin and network owner |
| command = """StaticNATRule.disable( |
| {apiclient}, |
| ipaddressid=self.public_ip.ipaddress.id |
| )""" |
| self.exec_command("self.otheruser_apiclient", command, expected=False) |
| self.exec_command("self.user_apiclient", command, expected=True) |
| #self.exec_command("self.domainadmin_apiclient", command, expected=True) |
| |
| # 11. Create port forwarding rule, should succeed by domain admin |
| command = """self.port_forwarding_rule = NATRule.create( |
| {apiclient}, |
| virtual_machine=self.virtual_machine, |
| services=self.services["natrule"], |
| ipaddressid=self.public_ip.ipaddress.id, |
| )""" |
| self.exec_command("self.otheruser_apiclient", command, expected=False) |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.domainadmin_apiclient", command, expected=True) |
| |
| # 12. Delete port forwarding rule, should succeed by domain admin and network owner |
| command = """self.port_forwarding_rule.delete({apiclient})""" |
| self.exec_command("self.otheruser_apiclient", command, expected=False) |
| self.exec_command("self.user_apiclient", command, expected=True) |
| #self.exec_command("self.domainadmin_apiclient", command, expected=True) |
| |
| # 13. Create load balancer rule, should succeed by domain admin and network owner |
| command = """self.load_balancer_rule = LoadBalancerRule.create( |
| {apiclient}, |
| self.services["lbrule"], |
| ipaddressid=self.public_ip.ipaddress.id, |
| networkid=self.user_network.id, |
| )""" |
| self.exec_command("self.otheruser_apiclient", command, expected=False) |
| self.exec_command("self.user_apiclient", command, expected=True) |
| #self.exec_command("self.domainadmin_apiclient", command, expected=True) |
| |
| # 14. Assign virtual machine to load balancing rule, should succeed by domain admin |
| command = """self.load_balancer_rule.assign({apiclient}, vms=[self.virtual_machine])""" |
| self.exec_command("self.otheruser_apiclient", command, expected=False) |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.domainadmin_apiclient", command, expected=True) |
| |
| # 15. Remove virtual machine from load balancing rule, should succeed by domain admin and network owner |
| command = """self.load_balancer_rule.remove({apiclient}, vms=[self.virtual_machine])""" |
| self.exec_command("self.otheruser_apiclient", command, expected=False) |
| self.exec_command("self.user_apiclient", command, expected=True) |
| #self.exec_command("self.domainadmin_apiclient", command, expected=True) |
| |
| # 16. Delete load balancing rule, should succeed by domain admin and network owner |
| command = """self.load_balancer_rule.delete({apiclient})""" |
| self.exec_command("self.otheruser_apiclient", command, expected=False) |
| self.exec_command("self.user_apiclient", command, expected=True) |
| #self.exec_command("self.domainadmin_apiclient", command, expected=True) |
| |
| # 17. Release public IP, should succeed by domain admin and network owner |
| command = """self.public_ip.delete({apiclient})""" |
| self.exec_command("self.otheruser_apiclient", command, expected=False) |
| self.exec_command("self.user_apiclient", command, expected=True) |
| #self.exec_command("self.domainadmin_apiclient", command, expected=True) |
| |
| # 18. Stop vm2 with forced=true, should succeed by vm owner |
| command = """self.virtual_machine.stop({apiclient}, forced=True)""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| # 19. Update vm2, should succeed by vm owner |
| command = """self.virtual_machine.update({apiclient}, displayname = self.virtual_machine.displayname + ".new")""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| # 20. Restore vm2, should succeed by vm owner |
| command = """self.virtual_machine.restore({apiclient})""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| # 21. Scale vm2 to another offering, should succeed by vm owner |
| self.service_offering_new = ServiceOffering.create( |
| self.apiclient, |
| self.services["service_offerings"]["big"] |
| ) |
| self.cleanup.append(self.service_offering_new) |
| command = """self.virtual_machine.scale_virtualmachine({apiclient}, self.service_offering_new.id)""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| # 22. Start VM before destroying, to recreate ROOT volume that was deleted as part of restore operation |
| command = """self.virtual_machine.start({apiclient})""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| # 23. Destroy vm2, should succeed by vm owner |
| command = """self.virtual_machine.delete({apiclient}, expunge=False)""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| # 24. Recover vm2, should succeed by vm owner |
| allow_expunge_recover_vm = Configurations.list(self.apiclient, name="allow.user.expunge.recover.vm")[0].value |
| self.logger.debug("Global configuration allow.user.expunge.recover.vm = %s", allow_expunge_recover_vm) |
| if allow_expunge_recover_vm == "true": |
| command = """self.virtual_machine.recover({apiclient})""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| # 25. Destroy vm2, should succeed by vm owner |
| command = """self.virtual_machine.delete({apiclient}, expunge=False)""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| |
| # 26. Expunge vm2, should succeed by vm owner |
| if allow_expunge_recover_vm == "true": |
| command = """self.virtual_machine.expunge({apiclient})""" |
| self.exec_command("self.user_apiclient", command, expected=False) |
| self.exec_command("self.otheruser_apiclient", command, expected=True) |
| else: |
| self.virtual_machine.expunge(self.apiclient) |
| |
| # 27. Reset network permissions, should succeed by network owner |
| command = """self.reset_network_permission({apiclient}, self.user_network, expected=True)""" |
| self.exec_command("self.otheruser_apiclient", command, expected=False) |
| self.exec_command("self.user_apiclient", command, expected=True) |
| |
| @attr(tags=["advanced"], required_hardware="false") |
| def test_05_list_networks_under_project(self): |
| """ Testing list networks under a project """ |
| self.create_network_permission(self.apiclient, self.user_network, self.domain_admin, self.project, expected=True) |
| self.list_network(self.apiclient, self.domain_admin, self.user_network, self.project, None, expected=True) |
| |
| self.remove_network_permission(self.apiclient, self.user_network, self.domain_admin, self.project, expected=True) |
| self.list_network(self.apiclient, self.domain_admin, self.user_network, self.project, None, expected=False) |
| |
| @attr(tags=["advanced"], required_hardware="false") |
| def test_06_list_networks_under_account(self): |
| """ Testing list networks under a domain admin account and user account """ |
| self.create_network_permission(self.apiclient, self.user_network, self.domain_admin, None, expected=True) |
| self.list_network(self.apiclient, self.domain_admin, self.user_network, None, None, expected=True) |
| self.list_network(self.domainadmin_apiclient, self.domain_admin, self.user_network, None, None, expected=True) |
| self.list_network(self.user_apiclient, self.domain_admin, self.user_network, None, None, expected=False) |
| |
| self.remove_network_permission(self.apiclient, self.user_network, self.domain_admin, None, expected=True) |
| self.list_network(self.apiclient, self.domain_admin, self.user_network, None, None, expected=False) |
| self.list_network(self.domainadmin_apiclient, self.domain_admin, self.user_network, None, None, expected=False) |
| |
| self.create_network_permission(self.apiclient, self.user_network, self.other_user, None, expected=True) |
| self.list_network(self.apiclient, self.other_user, self.user_network, None, None, expected=True) |
| self.list_network(self.otheruser_apiclient, self.other_user, self.user_network, None, None, expected=True) |
| |
| self.remove_network_permission(self.apiclient, self.user_network, self.other_user, None, expected=True) |
| self.list_network(self.apiclient, self.other_user, self.user_network, None, None, expected=False) |
| self.list_network(self.otheruser_apiclient, self.other_user, self.user_network, None, None, expected=False) |