| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| """ BVT tests for Virtual Machine IAM effect |
| """ |
| #Import Local Modules |
| import marvin |
| from marvin.cloudstackTestCase import * |
| from marvin.cloudstackAPI import * |
| from marvin.integration.lib.utils import * |
| from marvin.integration.lib.base import * |
| from marvin.integration.lib.common import * |
| from nose.plugins.attrib import attr |
| #Import System modules |
| import time |
| |
| _multiprocess_shared_ = True |
| class Services: |
| """Test VM Life Cycle Services |
| """ |
| |
| def __init__(self): |
| self.services = { |
| #data for domains and accounts |
| "domain1": { |
| "name": "Domain1", |
| }, |
| "account1A": { |
| "email": "test1A@test.com", |
| "firstname": "test1A", |
| "lastname": "User", |
| "username": "test1A", |
| "password": "password", |
| }, |
| "account1B": { |
| "email": "test1B@test.com", |
| "firstname": "test1B", |
| "lastname": "User", |
| "username": "test1B", |
| "password": "password", |
| }, |
| "domain2": { |
| "name": "Domain2", |
| }, |
| "account2A": { |
| "email": "test2A@test.com", |
| "firstname": "test2A", |
| "lastname": "User", |
| "username": "test2A", |
| "password": "password", |
| }, |
| #data reqd for virtual machine creation |
| "virtual_machine1A" : { |
| "name" : "test1Avm", |
| "displayname" : "Test1A VM", |
| }, |
| "virtual_machine1B" : { |
| "name" : "test1Bvm", |
| "displayname" : "Test1B VM", |
| }, |
| "virtual_machine2A" : { |
| "name" : "test2Avm", |
| "displayname" : "Test2A VM", |
| }, |
| #small service offering |
| "service_offering": { |
| "small": { |
| "name": "Small Instance", |
| "displaytext": "Small Instance", |
| "cpunumber": 1, |
| "cpuspeed": 100, |
| "memory": 128, |
| }, |
| }, |
| "ostype": 'CentOS 5.6 (64-bit)', |
| # iam group and policy information |
| "service_desk_iam_grp" : { |
| "name" : "Service Desk", |
| "description" : "Service Desk IAM Group" |
| }, |
| "vm_readonly_iam_policy" : { |
| "name" : "VM Read Only Access", |
| "description" : "VM read only access iam policy" |
| }, |
| } |
| |
| |
| |
| class TestVMIam(cloudstackTestCase): |
| |
| @classmethod |
| def setUpClass(self): |
| self.apiclient = super(TestVMIam, self).getClsTestClient().getApiClient() |
| self.services = Services().services |
| |
| # backup default apikey and secretkey |
| self.default_apikey = self.apiclient.connection.apiKey |
| self.default_secretkey = self.apiclient.connection.securityKey |
| |
| # Create domains and accounts etc |
| self.domain_1 = Domain.create( |
| self.apiclient, |
| self.services["domain1"] |
| ) |
| self.domain_2 = Domain.create( |
| self.apiclient, |
| self.services["domain2"] |
| ) |
| # Create two accounts for doamin_1 |
| self.account_1A = Account.create( |
| self.apiclient, |
| self.services["account1A"], |
| admin=False, |
| domainid=self.domain_1.id |
| ) |
| |
| self.account_1B = Account.create( |
| self.apiclient, |
| self.services["account1B"], |
| admin=False, |
| domainid=self.domain_1.id |
| ) |
| |
| # Create an account for domain_2 |
| self.account_2A = Account.create( |
| self.apiclient, |
| self.services["account2A"], |
| admin=False, |
| domainid=self.domain_2.id |
| ) |
| |
| # Fetch user details to register apiKey for them |
| self.user_1A = User.list( |
| self.apiclient, |
| account=self.account_1A.name, |
| domainid=self.account_1A.domainid |
| )[0] |
| |
| user_1A_key = User.registerUserKeys( |
| self.apiclient, |
| self.user_1A.id |
| ) |
| self.user_1A_apikey = user_1A_key.apikey |
| self.user_1A_secretkey = user_1A_key.secretkey |
| |
| |
| self.user_1B = User.list( |
| self.apiclient, |
| account=self.account_1B.name, |
| domainid=self.account_1B.domainid |
| )[0] |
| |
| user_1B_key = User.registerUserKeys( |
| self.apiclient, |
| self.user_1B.id |
| ) |
| |
| self.user_1B_apikey = user_1B_key.apikey |
| self.user_1B_secretkey = user_1B_key.secretkey |
| |
| |
| self.user_2A = User.list( |
| self.apiclient, |
| account=self.account_2A.name, |
| domainid=self.account_2A.domainid |
| )[0] |
| |
| user_2A_key = User.registerUserKeys( |
| self.apiclient, |
| self.user_2A.id |
| ) |
| self.user_2A_apikey = user_2A_key.apikey |
| self.user_2A_secretkey = user_2A_key.secretkey |
| |
| # create service offering |
| self.service_offering = ServiceOffering.create( |
| self.apiclient, |
| self.services["service_offering"]["small"] |
| ) |
| |
| self.zone = get_zone(self.apiclient, self.services) |
| self.services['mode'] = self.zone.networktype |
| self.template = get_template(self.apiclient, self.zone.id, self.services["ostype"]) |
| |
| # deploy 3 VMs for three accounts |
| self.virtual_machine_1A = VirtualMachine.create( |
| self.apiclient, |
| self.services["virtual_machine1A"], |
| accountid=self.account_1A.name, |
| zoneid=self.zone.id, |
| domainid=self.account_1A.domainid, |
| serviceofferingid=self.service_offering.id, |
| templateid=self.template.id |
| ) |
| |
| self.virtual_machine_1B = VirtualMachine.create( |
| self.apiclient, |
| self.services["virtual_machine1B"], |
| accountid=self.account_1B.name, |
| zoneid=self.zone.id, |
| domainid=self.account_1B.domainid, |
| serviceofferingid=self.service_offering.id, |
| templateid=self.template.id |
| ) |
| |
| self.virtual_machine_2A = VirtualMachine.create( |
| self.apiclient, |
| self.services["virtual_machine2A"], |
| accountid=self.account_2A.name, |
| zoneid=self.zone.id, |
| domainid=self.account_2A.domainid, |
| serviceofferingid=self.service_offering.id, |
| templateid=self.template.id |
| ) |
| |
| self.srv_desk_grp = IAMGroup.create( |
| self.apiclient, |
| self.services["service_desk_iam_grp"] |
| ) |
| |
| self.vm_read_policy = IAMPolicy.create( |
| self.apiclient, |
| self.services["vm_readonly_iam_policy"] |
| ) |
| |
| self.srv_desk_grp.attachPolicy( |
| self.apiclient, [self.vm_read_policy] |
| ) |
| |
| vm_grant_policy_params = {} |
| vm_grant_policy_params['name'] = "policyGrantVirtualMachine" + self.virtual_machine_1A.id |
| vm_grant_policy_params['description'] = "Policy to grant permission to VirtualMachine " + self.virtual_machine_1A.id |
| self.vm_grant_policy = IAMPolicy.create( |
| self.apiclient, |
| vm_grant_policy_params |
| ) |
| |
| self._cleanup = [ |
| self.account_1A, |
| self.account_1B, |
| self.domain_1, |
| self.account_2A, |
| self.domain_2, |
| self.service_offering, |
| self.vm_read_policy, |
| self.srv_desk_grp, |
| self.vm_grant_policy |
| ] |
| |
| @classmethod |
| def tearDownClass(self): |
| self.apiclient = super(TestVMIam, self).getClsTestClient().getApiClient() |
| cleanup_resources(self.apiclient, self._cleanup) |
| return |
| |
| def setUp(self): |
| self.apiclient = self.testClient.getApiClient() |
| self.dbclient = self.testClient.getDbConnection() |
| self.cleanup = [] |
| |
| def tearDown(self): |
| # restore back default apikey and secretkey |
| self.apiclient.connection.apiKey = self.default_apikey |
| self.apiclient.connection.securityKey = self.default_secretkey |
| cleanup_resources(self.apiclient, self.cleanup) |
| return |
| |
| |
| |
| @attr(tags = ["devcloud", "advanced", "advancedns", "smoke", "basic", "sg"]) |
| def test_01_list_own_vm(self): |
| # listVM command should return owne's VM |
| |
| self.debug("Listing VM for account: %s" % self.account_1A.name) |
| |
| self.apiclient.connection.apiKey = self.user_1A_apikey |
| self.apiclient.connection.securityKey = self.user_1A_secretkey |
| list_vm_response = list_virtual_machines( |
| self.apiclient |
| ) |
| self.assertEqual( |
| isinstance(list_vm_response, list), |
| True, |
| "Check list response returns a valid list" |
| ) |
| self.assertEqual( |
| len(list_vm_response), |
| 1, |
| "Check VM available in List Virtual Machines" |
| ) |
| |
| self.assertEqual( |
| list_vm_response[0].name, |
| self.virtual_machine_1A.name, |
| "Virtual Machine names do not match" |
| ) |
| |
| self.debug("Listing VM for account: %s" % self.account_1B.name) |
| self.apiclient.connection.apiKey = self.user_1B_apikey |
| self.apiclient.connection.securityKey = self.user_1B_secretkey |
| list_vm_response = list_virtual_machines( |
| self.apiclient |
| ) |
| self.assertEqual( |
| isinstance(list_vm_response, list), |
| True, |
| "Check list response returns a valid list" |
| ) |
| self.assertEqual( |
| len(list_vm_response), |
| 1, |
| "Check VM available in List Virtual Machines" |
| ) |
| |
| self.assertEqual( |
| list_vm_response[0].name, |
| self.virtual_machine_1B.name, |
| "Virtual Machine names do not match" |
| ) |
| |
| self.debug("Listing VM for account: %s" % self.account_2A.name) |
| |
| self.apiclient.connection.apiKey = self.user_2A_apikey |
| self.apiclient.connection.securityKey = self.user_2A_secretkey |
| list_vm_response = list_virtual_machines( |
| self.apiclient |
| ) |
| self.assertEqual( |
| isinstance(list_vm_response, list), |
| True, |
| "Check list response returns a valid list" |
| ) |
| self.assertEqual( |
| len(list_vm_response), |
| 1, |
| "Check VM available in List Virtual Machines" |
| ) |
| |
| self.assertEqual( |
| list_vm_response[0].name, |
| self.virtual_machine_2A.name, |
| "Virtual Machine names do not match" |
| ) |
| |
| return |
| |
| @attr(tags = ["devcloud", "advanced", "advancedns", "smoke", "basic", "sg"]) |
| def test_02_grant_domain_vm(self): |
| |
| # Validate the following |
| # 1. Grant domain2 VM access to account_1B |
| # 2. listVM command should return account_1B and domain_2 VMs. |
| |
| self.debug("Granting Domain %s VM read only access to account: %s" % (self.domain_2.name, self.account_1B.name)) |
| |
| self.srv_desk_grp.addAccount(self.apiclient, [self.account_1B]) |
| domain_permission = {} |
| domain_permission['action'] = "listVirtualMachines" |
| domain_permission['entitytype'] = "VirtualMachine" |
| domain_permission['scope'] = "DOMAIN" |
| domain_permission['scopeid'] = self.domain_2.id |
| self.vm_read_policy.addPermission(self.apiclient, domain_permission) |
| |
| self.debug("Listing VM for account: %s" % self.account_1B.name) |
| self.apiclient.connection.apiKey = self.user_1B_apikey |
| self.apiclient.connection.securityKey = self.user_1B_secretkey |
| list_vm_response = list_virtual_machines( |
| self.apiclient |
| ) |
| self.assertEqual( |
| isinstance(list_vm_response, list), |
| True, |
| "Check list response returns a valid list" |
| ) |
| self.assertEqual( |
| len(list_vm_response), |
| 2, |
| "Check VM available in List Virtual Machines" |
| ) |
| |
| list_vm_names = [list_vm_response[0].name, list_vm_response[1].name] |
| |
| self.assertEqual( self.virtual_machine_1B.name in list_vm_names, |
| True, |
| "Accessible Virtual Machine names do not match" |
| ) |
| |
| self.assertEqual( self.virtual_machine_2A.name in list_vm_names, |
| True, |
| "Accessible Virtual Machine names do not match" |
| ) |
| |
| return |
| |
| |
| @attr(tags = ["devcloud", "advanced", "advancedns", "smoke", "basic", "sg"]) |
| def test_03_grant_account_vm(self): |
| |
| # Validate the following |
| # 1. Grant account_1A VM access to account_1B |
| # 2. listVM command should return account_1A and account_1B VMs. |
| |
| self.debug("Granting Account %s VM read only access to account: %s" % (self.account_1A.name, self.account_1B.name)) |
| |
| account_permission = {} |
| account_permission['action'] = "listVirtualMachines" |
| account_permission['entitytype'] = "VirtualMachine" |
| account_permission['scope'] = "ACCOUNT" |
| account_permission['scopeid'] = self.account_1A.id |
| self.vm_read_policy.addPermission(self.apiclient, account_permission) |
| |
| self.debug("Listing VM for account: %s" % self.account_1B.name) |
| self.apiclient.connection.apiKey = self.user_1B_apikey |
| self.apiclient.connection.securityKey = self.user_1B_secretkey |
| list_vm_response = list_virtual_machines( |
| self.apiclient |
| ) |
| self.assertEqual( |
| isinstance(list_vm_response, list), |
| True, |
| "Check list response returns a valid list" |
| ) |
| self.assertEqual( |
| len(list_vm_response), |
| 3, |
| "Check VM available in List Virtual Machines" |
| ) |
| |
| list_vm_names = [list_vm_response[0].name, list_vm_response[1].name, list_vm_response[2].name] |
| |
| self.assertEqual( self.virtual_machine_1B.name in list_vm_names, |
| True, |
| "Accessible Virtual Machine names do not match" |
| ) |
| |
| self.assertEqual( self.virtual_machine_1A.name in list_vm_names, |
| True, |
| "Accessible Virtual Machine names do not match" |
| ) |
| |
| self.assertEqual( self.virtual_machine_2A.name in list_vm_names, |
| True, |
| "Accessible Virtual Machine names do not match" |
| ) |
| |
| return |
| |
| |
| @attr(tags = ["devcloud", "advanced", "advancedns", "smoke", "basic", "sg"]) |
| def test_04_revoke_account_vm(self): |
| |
| # Validate the following |
| # 1. Revoke account_1A VM access from account_1B |
| # 2. listVM command should not return account_1A VMs. |
| |
| self.debug("Revoking Account %s VM read only access from account: %s" % (self.account_1A.name, self.account_1B.name)) |
| |
| account_permission = {} |
| account_permission['action'] = "listVirtualMachines" |
| account_permission['entitytype'] = "VirtualMachine" |
| account_permission['scope'] = "ACCOUNT" |
| account_permission['scopeid'] = self.account_1A.id |
| self.vm_read_policy.removePermission(self.apiclient, account_permission) |
| |
| self.debug("Listing VM for account: %s" % self.account_1B.name) |
| self.apiclient.connection.apiKey = self.user_1B_apikey |
| self.apiclient.connection.securityKey = self.user_1B_secretkey |
| list_vm_response = list_virtual_machines( |
| self.apiclient |
| ) |
| self.assertEqual( |
| isinstance(list_vm_response, list), |
| True, |
| "Check list response returns a valid list" |
| ) |
| self.assertEqual( |
| len(list_vm_response), |
| 2, |
| "Check VM available in List Virtual Machines" |
| ) |
| |
| list_vm_names = [list_vm_response[0].name, list_vm_response[1].name] |
| |
| |
| self.assertEqual( self.virtual_machine_1A.name in list_vm_names, |
| False, |
| "Accessible Virtual Machine names do not match" |
| ) |
| return |
| |
| |
| @attr(tags = ["devcloud", "advanced", "advancedns", "smoke", "basic", "sg"]) |
| def test_05_revoke_domain_vm(self): |
| |
| # Validate the following |
| # 1. Revoke account_1A VM access from account_1B |
| # 2. listVM command should not return account_1A VMs. |
| |
| self.debug("Revoking Domain %s VM read only access from account: %s" % (self.domain_1.name, self.account_1B.name)) |
| |
| domain_permission = {} |
| domain_permission['action'] = "listVirtualMachines" |
| domain_permission['entitytype'] = "VirtualMachine" |
| domain_permission['scope'] = "DOMAIN" |
| domain_permission['scopeid'] = self.domain_2.id |
| self.vm_read_policy.removePermission(self.apiclient, domain_permission) |
| |
| self.debug("Listing VM for account: %s" % self.account_1B.name) |
| self.apiclient.connection.apiKey = self.user_1B_apikey |
| self.apiclient.connection.securityKey = self.user_1B_secretkey |
| list_vm_response = list_virtual_machines( |
| self.apiclient |
| ) |
| self.assertEqual( |
| isinstance(list_vm_response, list), |
| True, |
| "Check list response returns a valid list" |
| ) |
| self.assertEqual( |
| len(list_vm_response), |
| 1, |
| "Check VM available in List Virtual Machines" |
| ) |
| |
| self.assertEqual( |
| list_vm_response[0].name, |
| self.virtual_machine_1B.name, |
| "Virtual Machine names do not match" |
| ) |
| |
| return |
| |
| @attr(tags = ["devcloud", "advanced", "advancedns", "smoke", "basic", "sg"]) |
| def test_06_grant_resource_vm(self): |
| |
| # Validate the following |
| # 1. Grant a particular vm access to account_1B |
| # 2. listVM command should return account_1B VMs and granted VM. |
| |
| self.debug("Granting VM %s read only access to account: %s" % (self.virtual_machine_1A.name, self.account_1B.name)) |
| |
| res_permission = {} |
| res_permission['action'] = "listVirtualMachines" |
| res_permission['entitytype'] = "VirtualMachine" |
| res_permission['scope'] = "RESOURCE" |
| res_permission['scopeid'] = self.virtual_machine_1A.id |
| self.vm_read_policy.addPermission(self.apiclient, res_permission) |
| |
| self.debug("Listing VM for account: %s" % self.account_1B.name) |
| self.apiclient.connection.apiKey = self.user_1B_apikey |
| self.apiclient.connection.securityKey = self.user_1B_secretkey |
| list_vm_response = list_virtual_machines( |
| self.apiclient |
| ) |
| self.assertEqual( |
| isinstance(list_vm_response, list), |
| True, |
| "Check list response returns a valid list" |
| ) |
| self.assertEqual( |
| len(list_vm_response), |
| 2, |
| "Check VM available in List Virtual Machines" |
| ) |
| |
| list_vm_names = [list_vm_response[0].name, list_vm_response[1].name] |
| |
| self.assertEqual( self.virtual_machine_1B.name in list_vm_names, |
| True, |
| "Accessible Virtual Machine names do not match" |
| ) |
| |
| self.assertEqual( self.virtual_machine_1A.name in list_vm_names, |
| True, |
| "Accessible Virtual Machine names do not match" |
| ) |
| |
| return |
| |
| @attr(tags = ["devcloud", "advanced", "advancedns", "smoke", "basic", "sg"]) |
| def test_07_revoke_resource_vm(self): |
| |
| # Validate the following |
| # 1. Grant a particular vm access to account_1B |
| # 2. listVM command should return account_1B VMs and granted VM. |
| |
| self.debug("Revoking VM %s read only access from account: %s" % (self.virtual_machine_1A.name, self.account_1B.name)) |
| |
| res_permission = {} |
| res_permission['action'] = "listVirtualMachines" |
| res_permission['entitytype'] = "VirtualMachine" |
| res_permission['scope'] = "RESOURCE" |
| res_permission['scopeid'] = self.virtual_machine_1A.id |
| self.vm_read_policy.removePermission(self.apiclient, res_permission) |
| |
| self.debug("Listing VM for account: %s" % self.account_1B.id) |
| self.apiclient.connection.apiKey = self.user_1B_apikey |
| self.apiclient.connection.securityKey = self.user_1B_secretkey |
| list_vm_response = list_virtual_machines( |
| self.apiclient |
| ) |
| self.assertEqual( |
| isinstance(list_vm_response, list), |
| True, |
| "Check list response returns a valid list" |
| ) |
| self.assertEqual( |
| len(list_vm_response), |
| 1, |
| "Check VM available in List Virtual Machines" |
| ) |
| |
| self.assertEqual( |
| list_vm_response[0].name, |
| self.virtual_machine_1B.name, |
| "Virtual Machine names do not match" |
| ) |
| |
| return |
| |
| |
| @attr(tags = ["devcloud", "advanced", "advancedns", "smoke", "basic", "sg"]) |
| def test_08_policy_attach_account(self): |
| |
| # Validate the following |
| # 1. Grant a particular vm access to account_1B by directly attaching policy to account |
| # 2. listVM command should return account_1B VMs and granted VM. |
| |
| self.debug("Granting VM %s read only access to account: %s by attaching policy to account" % (self.virtual_machine_1A.name, self.account_1B.name)) |
| |
| res_permission = {} |
| res_permission['action'] = "listVirtualMachines" |
| res_permission['entitytype'] = "VirtualMachine" |
| res_permission['scope'] = "RESOURCE" |
| res_permission['scopeid'] = self.virtual_machine_1A.id |
| self.vm_grant_policy.addPermission(self.apiclient, res_permission) |
| self.vm_grant_policy.attachAccount(self.apiclient, [self.account_1B]) |
| |
| self.debug("Listing VM for account: %s" % self.account_1B.id) |
| self.apiclient.connection.apiKey = self.user_1B_apikey |
| self.apiclient.connection.securityKey = self.user_1B_secretkey |
| list_vm_response = list_virtual_machines( |
| self.apiclient |
| ) |
| self.assertEqual( |
| isinstance(list_vm_response, list), |
| True, |
| "Check list response returns a valid list" |
| ) |
| self.assertEqual( |
| len(list_vm_response), |
| 2, |
| "Check VM available in List Virtual Machines" |
| ) |
| |
| list_vm_names = [list_vm_response[0].name, list_vm_response[1].name] |
| |
| self.assertEqual( self.virtual_machine_1B.name in list_vm_names, |
| True, |
| "Accessible Virtual Machine names do not match" |
| ) |
| |
| self.assertEqual( self.virtual_machine_1A.name in list_vm_names, |
| True, |
| "Accessible Virtual Machine names do not match" |
| ) |
| |
| return |
| |
| @attr(tags = ["devcloud", "advanced", "advancedns", "smoke", "basic", "sg"]) |
| def test_09_policy_detach_account(self): |
| |
| # Validate the following |
| # 1. Revoking a particular vm access from account_1B by detaching policy from account |
| # 2. listVM command should return account_1B VMs. |
| |
| self.debug("Revoking VM %s read only access from account: %s by detaching policy from account" % (self.virtual_machine_1A.name, self.account_1B.name)) |
| |
| self.vm_grant_policy.detachAccount(self.apiclient, [self.account_1B]) |
| |
| self.debug("Listing VM for account: %s" % self.account_1B.id) |
| self.apiclient.connection.apiKey = self.user_1B_apikey |
| self.apiclient.connection.securityKey = self.user_1B_secretkey |
| list_vm_response = list_virtual_machines( |
| self.apiclient |
| ) |
| self.assertEqual( |
| isinstance(list_vm_response, list), |
| True, |
| "Check list response returns a valid list" |
| ) |
| self.assertEqual( |
| len(list_vm_response), |
| 1, |
| "Check VM available in List Virtual Machines" |
| ) |
| |
| self.assertEqual( |
| list_vm_response[0].name, |
| self.virtual_machine_1B.name, |
| "Virtual Machine names do not match" |
| ) |
| |
| return |