blob: 24fed74438a85a3709c79b7c3246a50e82be9fb3 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from marvin.cloudstackAPI import *
from marvin.cloudstackTestCase import cloudstackTestCase
from marvin.cloudstackException import CloudstackAPIException
from marvin.lib.base import Account, User, Project, ProjectRole, ProjectRolePermission, PublicIPAddress
from marvin.lib.utils import cleanup_resources
from nose.plugins.attrib import attr
from random import shuffle
import copy
import random
import re
class TestData(object):
"""Test data object that is required to create resources
"""
def __init__(self):
self.testdata = {
"account": {
"email": "user@test.com",
"firstname": "User",
"lastname": "User",
"username": "user",
"password": "password",
},
"useracc": {
"email": "user1@test.com",
"firstname": "User",
"lastname": "User",
"username": "user1",
"password": "fr3sca",
},
"project": {
"name": "Test Project",
"displaytext": "Test project",
},
"projectrole": {
"name": "MarvinFake project Role ",
"description": "Fake project Role created by Marvin test"
},
"projectrolepermission": {
"rule": "listPublicIpAddresses",
"permission": "deny",
"description": "Fake role permission created by Marvin test"
}
}
class TestRoleBasedUsersInProjects(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.testdata = TestData().testdata
feature_enabled = self.apiclient.listCapabilities(listCapabilities.listCapabilitiesCmd()).dynamicrolesenabled
if not feature_enabled:
self.skipTest("Dynamic Role-Based API checker not enabled, skipping test")
self.testdata["projectrole"]["name"] += self.getRandomString()
self.project = Project.create(self.apiclient, self.testdata["project"])
self.projectrole = ProjectRole.create(
self.apiclient,
self.testdata["projectrole"],
self.project.id
)
self.testdata["projectrolepermission"]["projectroleid"] = self.projectrole.id
self.projectrolepermission = ProjectRolePermission.create(
self.apiclient,
self.testdata["projectrolepermission"],
self.project.id
)
self.cleanup = [self.project]
def tearDown(self):
try:
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
self.debug("Warning! Exception in tearDown: %s" % e)
def getUserApiClient(self, username, domain='ROOT', role_type='User'):
self.user_apiclient = self.testClient.getUserApiClient(UserName=username, DomainName=domain,
type=self.translateRoleToAccountType(role_type))
return self.user_apiclient
def getRandomString(self):
return "".join(random.choice("abcdefghijklmnopqrstuvwxyz0123456789") for _ in range(10))
def translateRoleToAccountType(self, role_type):
if role_type == "User":
return 0
elif role_type == "Admin":
return 1
elif role_type == "DomainAdmin":
return 2
elif role_type == "ResourceAdmin":
return 3
return -1
@attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
def test_add_account_to_project_with_project_role(self):
"""
1. Create a User Account
2. Add user account with 'Regular' project account role associate it with a Project role;
The role defines what APIs are allowed/disallowed for the user: here, 'listPublicIpAddresses'
is denied for the user account
3. Execute the 'listPublicIpAddresses' API and verify/confirm that the API isn't allowed to be executed
by the account
"""
self.useraccount = Account.create(
self.apiclient,
self.testdata["account"],
roleid=4
)
self.cleanup.append(self.useraccount)
# Add account to the project
self.project.addAccount(
self.apiclient,
account=self.useraccount.name,
projectroleid=self.projectrole.id
)
self.userapiclient = self.testClient.getUserApiClient(
UserName=self.useraccount.name,
DomainName=self.useraccount.domain,
type=0)
try:
PublicIPAddress.list(
self.userapiclient,
projectid=self.project.id
)
self.fail("API call succeeded which is denied for the project role")
except CloudstackAPIException:
pass
@attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
def test_add_user_to_project_with_project_role(self):
"""
1. Create a User Account
2. Add user of an account with 'Regular' project account role associate it with a Project role;
The role defines what APIs are allowed/disallowed for the user: here, 'listPublicIpAddresses'
is denied for the user account
3. Execute the 'listPublicIpAddresses' API and verify/confirm that the API isn't allowed to be executed
by the user
"""
self.useraccount = Account.create(
self.apiclient,
self.testdata["account"],
roleid=4
)
self.cleanup.append(self.useraccount)
# Add account to the project
self.project.addUser(
self.apiclient,
username=self.useraccount.user[0].username,
projectroleid=self.projectrole.id
)
Project.listAccounts(self.apiclient, projectid=self.project.id)
self.userapiclient = self.testClient.getUserApiClient(
UserName=self.useraccount.name,
DomainName=self.useraccount.domain,
type=0)
try:
PublicIPAddress.list(
self.userapiclient,
projectid=self.project.id
)
self.fail("API call succeeded which is denied for the project role")
except CloudstackAPIException:
pass
@attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
def test_add_multiple_admins_in_project(self):
"""
1. Create a User Account
2. Add user account with 'Admin' project account role and associate it with a Project role;
The role defines what APIs are allowed/disallowed for the user: here, 'listPublicIpAddresses'
is denied for the user account
3. Execute the 'listPublicIpAddresses' API and verify/confirm that the user/account can execute the
API as it is a project admin
"""
self.useraccount = Account.create(
self.apiclient,
self.testdata["account"],
roleid=4
)
self.cleanup.append(self.useraccount)
self.useraccount1 = Account.create(
self.apiclient,
self.testdata["useracc"],
roleid=4
)
self.cleanup.append(self.useraccount1)
self.project.addAccount(
self.apiclient,
account=self.useraccount.name,
projectroleid=self.projectrole.id,
roletype='Admin'
)
self.project.addAccount(
self.apiclient,
account=self.useraccount1.name,
projectroleid=self.projectrole.id
)
project_accounts = Project.listAccounts(self.apiclient, projectid=self.project.id, role='Admin')
self.assertEqual(len(project_accounts), 2, "account not added with admin Role")
self.userapiclientAdminRole = self.testClient.getUserApiClient(
UserName=self.useraccount.name,
DomainName=self.useraccount.domain,
type=0)
self.userapiclientRegularRole = self.testClient.getUserApiClient(
UserName=self.useraccount1.name,
DomainName=self.useraccount1.domain,
type=0)
try:
PublicIPAddress.list(
self.userapiclientAdminRole,
projectid=self.project.id
)
self.debug("User added to the project could execute the listPublicIpAddresses API despite the project "
"role as it is the Admin")
pass
except CloudstackAPIException:
self.fail("User is an Admin, should be able to execute the command despite Project role")
try:
self.project.suspend(
self.userapiclientAdminRole,
)
self.debug("The user can perform Project administrative operations as it is added as "
"an Admin to the project")
pass
except CloudstackAPIException:
self.fail("User should be allowed to execute project administrative operations"
"as it is the Project Admin")
try:
self.project.suspend(
self.userapiclientRegularRole,
)
except Exception as e:
pass