| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| from nose.plugins.attrib import attr |
| from marvin.cloudstackTestCase import * |
| from marvin.cloudstackAPI import * |
| from marvin.lib.utils import * |
| from marvin.lib.base import * |
| from marvin.lib.common import * |
| |
| import requests |
| |
| |
| class TestLogin(cloudstackTestCase): |
| """ |
| Tests default login API handler |
| """ |
| |
| def setUp(self): |
| self.apiclient = self.testClient.getApiClient() |
| self.dbclient = self.testClient.getDbConnection() |
| self.server_details = self.config.__dict__["mgtSvr"][0].__dict__ |
| self.server_url = "http://%s:8080/client/api" % self.server_details['mgtSvrIp'] |
| self.testdata = { |
| "account": { |
| "email": "login-user@test.cloud", |
| "firstname": "TestLoginFirstName", |
| "lastname": "TestLoginLastName", |
| "username": "testloginuser-", |
| "password": "password123", |
| } |
| } |
| self.cleanup = [] |
| |
| |
| def tearDown(self): |
| try: |
| cleanup_resources(self.apiclient, self.cleanup) |
| except Exception as e: |
| raise Exception("Warning: Exception during cleanup : %s" % e) |
| |
| |
| def login(self, username, password, domain="/"): |
| """ |
| Logs in and returns a session to be used for subsequent API calls |
| """ |
| args = {} |
| args["command"] = 'login' |
| args["username"] = username |
| args["password"] = password |
| args["domain"] = domain |
| args["response"] = "json" |
| |
| session = requests.Session() |
| |
| try: |
| resp = session.post(self.server_url, params=args, verify=False) |
| except requests.exceptions.ConnectionError as e: |
| self.fail("Failed to attempt login request to mgmt server") |
| return None, None |
| |
| return resp, session |
| |
| |
| @attr(tags = ["devcloud", "advanced", "advancedns", "advancedsg", "smoke", |
| "basic", "sg"], required_hardware="false") |
| def login_test_saml_user(self): |
| """ |
| Tests that SAML users are not allowed CloudStack local log in |
| |
| Creates account across various account types and converts them to |
| a SAML user and tests that they are not able to log in; then |
| converts them back as a CloudStack user account and verifies that |
| they are allowed to log in and make API requests |
| """ |
| # Tests across various account types: 0=User, 1=Root Admin, 2=Domain Admin |
| for account_type in range(0, 3): |
| account = Account.create( |
| self.apiclient, |
| self.testdata['account'], |
| admin=account_type |
| ) |
| self.cleanup.append(account) |
| |
| username = account.user[0].username |
| password = self.testdata['account']['password'] |
| |
| # Convert newly created account user to SAML user |
| user_id = self.dbclient.execute("select id from user where uuid='%s'" % account.user[0].id)[0][0] |
| self.dbclient.execute("update user set source='SAML2' where id=%d" % user_id) |
| |
| response, session = self.login(username, password) |
| self.assertEqual( |
| response.json()['loginresponse']['errorcode'], |
| 531, |
| "SAML user should not be allowed to log in, error code 531 not returned" |
| ) |
| self.assertEqual( |
| response.json()['loginresponse']['errortext'], |
| "User is not allowed CloudStack login", |
| "Invalid error message returned, SAML user should not be allowed to log in" |
| ) |
| |
| # Convert newly created account user back to normal source |
| self.dbclient.execute("update user set source='UNKNOWN' where id=%d" % user_id) |
| |
| response, session = self.login(username, password) |
| self.assertEqual( |
| response.status_code, |
| 200, |
| "Login response code was not 200" |
| ) |
| self.assertTrue( |
| len(response.json()['loginresponse']['sessionkey']) > 0, |
| "Invalid session key received" |
| ) |
| |
| args = {} |
| args["command"] = 'listUsers' |
| args["listall"] = 'true' |
| args["response"] = "json" |
| args["sessionkey"] = response.json()['loginresponse']['sessionkey'] |
| response = session.get(self.server_url, params=args) |
| self.assertEqual( |
| response.status_code, |
| 200, |
| "listUsers response code was not 200" |
| ) |
| self.assertTrue( |
| len(response.json()['listusersresponse']['user']) > 0, |
| "listUsers list is empty or zero" |
| ) |