| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| from marvin.cloudstackTestCase import cloudstackTestCase |
| from marvin.lib.utils import cleanup_resources |
| from marvin.lib.base import (Network, NetworkACLList, NetworkOffering, VpcOffering, VPC, NetworkACL) |
| from marvin.lib.common import (get_domain, get_zone) |
| from nose.plugins.attrib import attr |
| from marvin.cloudstackException import CloudstackAPIException |
| |
| |
| class Services: |
| """Test Global ACLs |
| """ |
| |
| def __init__(self): |
| self.services = { |
| "root_domain": { |
| "name": "ROOT", |
| }, |
| "domain": { |
| "name": "Domain", |
| }, |
| "user": { |
| "username": "user", |
| "roletype": 0, |
| }, |
| "domain_admin": { |
| "username": "Domain admin", |
| "roletype": 2, |
| }, |
| "root_admin": { |
| "username": "Root admin", |
| "roletype": 1, |
| }, |
| "vpc": { |
| "name": "vpc-networkacl", |
| "displaytext": "vpc-networkacl", |
| "cidr": "10.1.1.0/24", |
| }, |
| "vpcnetwork": { |
| "name": "vpcnetwork", |
| "displaytext": "vpcnetwork", |
| }, |
| "rule": { |
| "protocol": "all", |
| "traffictype": "ingress", |
| } |
| } |
| |
| |
| class TestGlobalACLs(cloudstackTestCase): |
| |
| @classmethod |
| def setUpClass(cls): |
| cls.testClient = super(TestGlobalACLs, cls).getClsTestClient() |
| cls.apiclient = cls.testClient.getApiClient() |
| |
| cls.services = Services().services |
| cls.domain = get_domain(cls.apiclient) |
| cls.zone = get_zone(cls.apiclient, cls.testClient.getZoneForTests()) |
| return |
| |
| def setUp(self): |
| self.user_apiclient = self.testClient.getUserApiClient(self.services["user"]["username"], |
| self.services["domain"]["name"], |
| self.services["user"]["roletype"]) |
| |
| self.domain_admin_apiclient = self.testClient.getUserApiClient(self.services["domain_admin"]["username"], |
| self.services["domain"]["name"], |
| self.services["domain_admin"]["roletype"]) |
| |
| self.admin_apiclient = self.testClient.getUserApiClient(self.services["root_admin"]["username"], |
| self.services["root_domain"]["name"], |
| self.services["root_admin"]["roletype"]) |
| |
| self.cleanup = [] |
| return |
| |
| def tearDown(self): |
| super(TestGlobalACLs, self).tearDown() |
| |
| @attr(tags=["advanced", "basic"], required_hardware="false") |
| def test_create_global_acl(self): |
| """ Test create global ACL as a normal user, domain admin and root admin users. |
| """ |
| |
| self.debug("Creating ACL list as a normal user, should raise exception.") |
| self.assertRaisesRegex(CloudstackAPIException, "Only Root Admin can create global ACLs.", |
| NetworkACLList.create, apiclient=self.user_apiclient, services={}, |
| name="acl", description="acl") |
| |
| self.debug("Creating ACL list as a domain admin, should raise exception.") |
| self.assertRaisesRegex(CloudstackAPIException, "Only Root Admin can create global ACLs.", |
| NetworkACLList.create, apiclient=self.domain_admin_apiclient, services={}, |
| name="acl", description="acl") |
| |
| self.debug("Creating ACL list as a root admin, should work.") |
| acl = NetworkACLList.create(apiclient=self.admin_apiclient, services={}, name="acl", description="acl") |
| self.cleanup.append(acl) |
| self.assertIsNotNone(acl, "A root admin user should be able to create a global ACL.") |
| |
| return |
| |
| @attr(tags=["advanced", "basic"], required_hardware="false") |
| def test_replace_acl_of_network(self): |
| """ Test to replace ACL of a VPC as a normal user, domain admin and root admin users. |
| """ |
| # Get network offering |
| networkOffering = NetworkOffering.list(self.apiclient, name="DefaultIsolatedNetworkOfferingForVpcNetworks") |
| self.assertTrue(networkOffering is not None and len(networkOffering) > 0, "No VPC network offering") |
| |
| # Getting VPC offering |
| vpcOffering = VpcOffering.list(self.apiclient, name="Default VPC offering") |
| self.assertTrue(vpcOffering is not None and len(vpcOffering) > 0, "No VPC offerings found") |
| |
| # Creating VPC |
| vpc = VPC.create( |
| apiclient=self.apiclient, |
| services=self.services["vpc"], |
| networkDomain="vpc.networkacl", |
| vpcofferingid=vpcOffering[0].id, |
| zoneid=self.zone.id, |
| domainid=self.domain.id |
| ) |
| self.cleanup.append(vpc) |
| self.assertTrue(vpc is not None, "VPC creation failed") |
| |
| # Creating ACL list |
| acl = NetworkACLList.create(apiclient=self.apiclient, services={}, name="acl", description="acl") |
| self.cleanup.append(acl) |
| |
| # Creating tier on VPC with ACL list |
| network = Network.create( |
| apiclient=self.apiclient, |
| services=self.services["vpcnetwork"], |
| accountid="Admin", |
| domainid=self.domain.id, |
| networkofferingid=networkOffering[0].id, |
| zoneid=self.zone.id, |
| vpcid=vpc.id, |
| aclid=acl.id, |
| gateway="10.1.1.1", |
| netmask="255.255.255.192" |
| ) |
| self.cleanup.append(network) |
| |
| # User should be able to replace ACL |
| network.replaceACLList(apiclient=self.user_apiclient, aclid=acl.id) |
| # Domain Admin should be able to replace ACL |
| network.replaceACLList(apiclient=self.domain_admin_apiclient, aclid=acl.id) |
| # Admin should be able to replace ACL |
| network.replaceACLList(apiclient=self.admin_apiclient, aclid=acl.id) |
| |
| return |
| |
| @attr(tags=["advanced", "basic"], required_hardware="false") |
| def test_create_acl_rule(self): |
| """ Test to create ACL rule as a normal user, domain admin and root admin users. |
| """ |
| # Creating ACL list |
| acl = NetworkACLList.create(apiclient=self.admin_apiclient, services={}, name="acl", description="acl") |
| self.cleanup.append(acl) |
| |
| self.debug("Creating ACL rule as a user, should raise exception.") |
| self.assertRaisesRegex(CloudstackAPIException, "Only Root Admins can create rules for a global ACL.", |
| NetworkACL.create, self.user_apiclient, services=self.services["rule"], aclid=acl.id) |
| self.debug("Creating ACL rule as a domain admin, should raise exception.") |
| self.assertRaisesRegex(CloudstackAPIException, "Only Root Admins can create rules for a global ACL.", |
| NetworkACL.create, self.domain_admin_apiclient, services=self.services["rule"], aclid=acl.id) |
| self.debug("Creating ACL rule as a root admin, should work.") |
| acl_rule = NetworkACL.create(self.admin_apiclient, services=self.services["rule"], aclid=acl.id) |
| self.cleanup.append(acl_rule) |
| |
| return |
| |
| @attr(tags=["advanced", "basic"], required_hardware="false") |
| def test_delete_acl_rule(self): |
| """ Test to delete ACL rule as a normal user, domain admin and root admin users. |
| """ |
| # Creating ACL list |
| acl = NetworkACLList.create(apiclient=self.apiclient, services={}, name="acl", description="acl") |
| self.cleanup.append(acl) |
| |
| # Creating ACL rule |
| acl_rule = NetworkACL.create(self.apiclient, services=self.services["rule"], aclid=acl.id) |
| self.cleanup.append(acl_rule) |
| |
| self.debug("Deleting ACL rule as a user, should raise exception.") |
| self.assertRaisesRegex(Exception, "Only Root Admin can delete global ACL rules.", |
| NetworkACL.delete, acl_rule, self.user_apiclient) |
| self.debug("Deleting ACL rule as a domain admin, should raise exception.") |
| self.assertRaisesRegex(Exception, "Only Root Admin can delete global ACL rules.", |
| NetworkACL.delete, acl_rule, self.domain_admin_apiclient) |
| |
| self.debug("Deleting ACL rule as a root admin, should work.") |
| NetworkACL.delete(acl_rule, self.admin_apiclient) |
| self.cleanup.remove(acl_rule) |
| |
| # Verify if the number of ACL rules is equal to four, i.e. the number of rules |
| # for the default ACLs `default_allow` (2 rules) and `default_deny` (2 rules) ACLs |
| number_of_acl_rules = acl_rule.list(apiclient=self.admin_apiclient) |
| self.assertEqual(len(number_of_acl_rules), 4) |
| |
| return |
| |
| |
| @attr(tags=["advanced", "basic"], required_hardware="false") |
| def test_delete_global_acl(self): |
| """ Test delete global ACL as a normal user, domain admin and root admin users. |
| """ |
| |
| # Creating ACL list. Not adding to cleanup as it will be deleted in this method |
| acl = NetworkACLList.create(apiclient=self.apiclient, services={}, name="acl", description="acl") |
| self.cleanup.append(acl) |
| |
| self.debug("Deleting ACL list as a normal user, should raise exception.") |
| self.assertRaisesRegex(Exception, "Only Root Admin can delete global ACLs.", |
| NetworkACLList.delete, acl, apiclient=self.user_apiclient) |
| |
| self.debug("Deleting ACL list as a domain admin, should raise exception.") |
| self.assertRaisesRegex(Exception, "Only Root Admin can delete global ACLs.", |
| NetworkACLList.delete, acl, apiclient=self.domain_admin_apiclient) |
| |
| self.debug("Deleting ACL list as a root admin, should work.") |
| acl.delete(apiclient=self.admin_apiclient) |
| self.cleanup.remove(acl) |
| |
| # Verify if number of ACLs is equal to two, i.e. the number of default ACLs `default_allow` and `default_deny` |
| number_of_acls = NetworkACLList.list(apiclient=self.admin_apiclient) |
| self.assertEqual(len(number_of_acls), 2) |
| |
| return |