blob: f8545a3cf0f3392575f45cf33f0983cb66b3929c [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import datetime
from unittest.mock import Mock
from libcloud.test import MockHttp, unittest
from libcloud.utils.py3 import httplib, assertRaisesRegex
from libcloud.test.secrets import OPENSTACK_PARAMS
from libcloud.common.openstack import OpenStackBaseConnection
from libcloud.test.file_fixtures import ComputeFileFixtures
from libcloud.common.openstack_identity import (
AUTH_TOKEN_EXPIRES_GRACE_SECONDS,
OpenStackIdentityUser,
OpenStackServiceCatalog,
OpenStackIdentity_2_0_Connection,
OpenStackIdentity_3_0_Connection,
OpenStackIdentity_2_0_Connection_VOMS,
OpenStackIdentity_3_0_Connection_AppCred,
OpenStackIdentity_3_0_Connection_OIDC_access_token,
get_class_for_auth_version,
)
from libcloud.compute.drivers.openstack import OpenStack_1_0_NodeDriver
from libcloud.test.compute.test_openstack import (
OpenStackMockHttp,
OpenStack_2_0_MockHttp,
OpenStackMockAuthCache,
)
try:
import simplejson as json
except ImportError:
import json
TOMORROW = datetime.datetime.today() + datetime.timedelta(1)
YESTERDAY = datetime.datetime.today() - datetime.timedelta(1)
class OpenStackIdentityConnectionTestCase(unittest.TestCase):
def setUp(self):
OpenStackBaseConnection.auth_url = None
OpenStackBaseConnection.conn_class = OpenStackMockHttp
OpenStack_2_0_MockHttp.type = None
OpenStackIdentity_3_0_MockHttp.type = None
def test_auth_url_is_correctly_assembled(self):
tuples = [
("1.0", OpenStackMockHttp, {}),
("1.1", OpenStackMockHttp, {}),
("2.0", OpenStack_2_0_MockHttp, {}),
("2.0_apikey", OpenStack_2_0_MockHttp, {}),
("2.0_password", OpenStack_2_0_MockHttp, {}),
(
"3.x_password",
OpenStackIdentity_3_0_MockHttp,
{"tenant_name": "tenant-name"},
),
("3.x_appcred", OpenStackIdentity_3_0_MockHttp, {}),
(
"3.x_oidc_access_token",
OpenStackIdentity_3_0_MockHttp,
{"tenant_name": "tenant-name"},
),
]
auth_urls = [
("https://auth.api.example.com", ""),
("https://auth.api.example.com/", "/"),
("https://auth.api.example.com/foo/bar", "/foo/bar"),
("https://auth.api.example.com/foo/bar/", "/foo/bar/"),
]
actions = {
"1.0": "{url_path}/v1.0",
"1.1": "{url_path}/v1.1/auth",
"2.0": "{url_path}/v2.0/tokens",
"2.0_apikey": "{url_path}/v2.0/tokens",
"2.0_password": "{url_path}/v2.0/tokens",
"3.x_password": "{url_path}/v3/auth/tokens",
"3.x_appcred": "{url_path}/v3/auth/tokens",
"3.x_oidc_access_token": "{url_path}/v3/OS-FEDERATION/identity_providers/user_name/protocols/tenant-name/auth",
}
user_id = OPENSTACK_PARAMS[0]
key = OPENSTACK_PARAMS[1]
for (auth_version, mock_http_class, kwargs) in tuples:
for (url, url_path) in auth_urls:
connection = self._get_mock_connection(
mock_http_class=mock_http_class, auth_url=url
)
auth_url = connection.auth_url
cls = get_class_for_auth_version(auth_version=auth_version)
osa = cls(
auth_url=auth_url,
user_id=user_id,
key=key,
parent_conn=connection,
**kwargs,
)
try:
osa = osa.authenticate()
except Exception:
pass
expected_path = actions[auth_version].format(url_path=url_path).replace("//", "/")
self.assertEqual(
osa.action,
expected_path,
)
def test_basic_authentication(self):
tuples = [
("1.0", OpenStackMockHttp, {}),
("1.1", OpenStackMockHttp, {}),
("2.0", OpenStack_2_0_MockHttp, {}),
("2.0_apikey", OpenStack_2_0_MockHttp, {}),
("2.0_password", OpenStack_2_0_MockHttp, {}),
(
"3.x_password",
OpenStackIdentity_3_0_MockHttp,
{
"user_id": "test_user_id",
"key": "test_key",
"token_scope": "project",
"tenant_name": "test_tenant",
"tenant_domain_id": "test_tenant_domain_id",
"domain_name": "test_domain",
},
),
(
"3.x_appcred",
OpenStackIdentity_3_0_MockHttp,
{"user_id": "appcred_id", "key": "appcred_secret"},
),
(
"3.x_oidc_access_token",
OpenStackIdentity_3_0_MockHttp,
{
"user_id": "test_user_id",
"key": "test_key",
"token_scope": "domain",
"tenant_name": "test_tenant",
"tenant_domain_id": "test_tenant_domain_id",
"domain_name": "test_domain",
},
),
]
user_id = OPENSTACK_PARAMS[0]
key = OPENSTACK_PARAMS[1]
for (auth_version, mock_http_class, kwargs) in tuples:
connection = self._get_mock_connection(mock_http_class=mock_http_class)
auth_url = connection.auth_url
if not kwargs:
kwargs["user_id"] = user_id
kwargs["key"] = key
cls = get_class_for_auth_version(auth_version=auth_version)
osa = cls(auth_url=auth_url, parent_conn=connection, **kwargs)
self.assertEqual(osa.urls, {})
self.assertIsNone(osa.auth_token)
self.assertIsNone(osa.auth_user_info)
osa = osa.authenticate()
self.assertTrue(len(osa.urls) >= 1)
self.assertTrue(osa.auth_token is not None)
if auth_version in [
"1.1",
"2.0",
"2.0_apikey",
"2.0_password",
"3.x_password",
"3.x_appcred",
"3.x_oidc_access_token",
]:
self.assertTrue(osa.auth_token_expires is not None)
if auth_version in [
"2.0",
"2.0_apikey",
"2.0_password",
"3.x_password",
"3.x_appcred",
"3.x_oidc_access_token",
]:
self.assertTrue(osa.auth_user_info is not None)
def test_token_expiration_and_force_reauthentication(self):
user_id = OPENSTACK_PARAMS[0]
key = OPENSTACK_PARAMS[1]
connection = self._get_mock_connection(OpenStack_2_0_MockHttp)
auth_url = connection.auth_url
osa = OpenStackIdentity_2_0_Connection(
auth_url=auth_url, user_id=user_id, key=key, parent_conn=connection
)
mocked_auth_method = Mock(wraps=osa._authenticate_2_0_with_body)
osa._authenticate_2_0_with_body = mocked_auth_method
# Force re-auth, expired token
osa.auth_token = None
osa.auth_token_expires = YESTERDAY
count = 5
for i in range(0, count):
osa.authenticate(force=True)
self.assertEqual(mocked_auth_method.call_count, count)
# No force reauth, expired token
osa.auth_token = None
osa.auth_token_expires = YESTERDAY
mocked_auth_method.call_count = 0
self.assertEqual(mocked_auth_method.call_count, 0)
for i in range(0, count):
osa.authenticate(force=False)
self.assertEqual(mocked_auth_method.call_count, 1)
# No force reauth, valid / non-expired token
osa.auth_token = None
mocked_auth_method.call_count = 0
self.assertEqual(mocked_auth_method.call_count, 0)
for i in range(0, count):
osa.authenticate(force=False)
if i == 0:
osa.auth_token_expires = TOMORROW
self.assertEqual(mocked_auth_method.call_count, 1)
# No force reauth, valid / non-expired token which is about to expire in
# less than AUTH_TOKEN_EXPIRES_GRACE_SECONDS
soon = datetime.datetime.utcnow() + datetime.timedelta(
seconds=AUTH_TOKEN_EXPIRES_GRACE_SECONDS - 1
)
osa.auth_token = None
mocked_auth_method.call_count = 0
self.assertEqual(mocked_auth_method.call_count, 0)
for i in range(0, count):
if i == 0:
osa.auth_token_expires = soon
osa.authenticate(force=False)
self.assertEqual(mocked_auth_method.call_count, 1)
def test_authentication_cache(self):
tuples = [
# 1.0 does not provide token expiration, so it always
# re-authenticates and never uses the cache.
# ('1.0', OpenStackMockHttp, {}),
("1.1", OpenStackMockHttp, {}),
("2.0", OpenStack_2_0_MockHttp, {}),
("2.0_apikey", OpenStack_2_0_MockHttp, {}),
("2.0_password", OpenStack_2_0_MockHttp, {}),
(
"3.x_password",
OpenStackIdentity_3_0_MockHttp,
{
"user_id": "test_user_id",
"key": "test_key",
"token_scope": "project",
"tenant_name": "test_tenant",
"tenant_domain_id": "test_tenant_domain_id",
"domain_name": "test_domain",
},
),
(
"3.x_oidc_access_token",
OpenStackIdentity_3_0_MockHttp,
{
"user_id": "test_user_id",
"key": "test_key",
"token_scope": "domain",
"tenant_name": "test_tenant",
"tenant_domain_id": "test_tenant_domain_id",
"domain_name": "test_domain",
},
),
]
user_id = OPENSTACK_PARAMS[0]
key = OPENSTACK_PARAMS[1]
for (auth_version, mock_http_class, kwargs) in tuples:
mock_http_class.type = None
connection = self._get_mock_connection(mock_http_class=mock_http_class)
auth_url = connection.auth_url
if not kwargs:
kwargs["user_id"] = user_id
kwargs["key"] = key
auth_cache = OpenStackMockAuthCache()
self.assertEqual(len(auth_cache), 0)
kwargs["auth_cache"] = auth_cache
cls = get_class_for_auth_version(auth_version=auth_version)
osa = cls(auth_url=auth_url, parent_conn=connection, **kwargs)
osa = osa.authenticate()
# Token is cached
self.assertEqual(len(auth_cache), 1)
# New client, token from cache is re-used
osa = cls(auth_url=auth_url, parent_conn=connection, **kwargs)
osa.request = Mock(wraps=osa.request)
osa = osa.authenticate()
# No auth API call
if auth_version in ("1.1", "2.0", "2.0_apikey", "2.0_password"):
self.assertEqual(osa.request.call_count, 0)
elif auth_version in ("3.x_password", "3.x_oidc_access_token"):
# v3 only caches token and expiration; service catalog URLs
# and the rest of the auth context are fetched from Keystone
osa.request.assert_called_once_with(
action="/v3/auth/tokens",
params=None,
data=None,
headers={
"X-Subject-Token": "00000000000000000000000000000000",
"X-Auth-Token": "00000000000000000000000000000000",
},
method="GET",
raw=False,
)
# Cache size unchanged
self.assertEqual(len(auth_cache), 1)
# Authenticates if cached token expired
cache_key = list(auth_cache.store.keys())[0]
auth_context = auth_cache.get(cache_key)
auth_context.expiration = YESTERDAY
auth_cache.put(cache_key, auth_context)
osa = cls(auth_url=auth_url, parent_conn=connection, **kwargs)
osa.request = Mock(wraps=osa.request)
osa._get_unscoped_token_from_oidc_token = Mock(return_value="000")
OpenStackIdentity_3_0_MockHttp.type = "GET_UNAUTHORIZED_POST_OK"
osa = osa.authenticate()
if auth_version in ("1.1", "2.0", "2.0_apikey", "2.0_password"):
self.assertEqual(osa.request.call_count, 1)
self.assertTrue(osa.request.call_args[1]["method"], "POST")
elif auth_version in ("3.x_password", "3.x_oidc_access_token"):
self.assertTrue(osa.request.call_args[0][0], "/v3/auth/tokens")
self.assertTrue(osa.request.call_args[1]["method"], "POST")
# Token evicted from cache if 401 received on another call
if hasattr(osa, "list_projects"):
mock_http_class.type = None
auth_cache.reset()
osa = cls(auth_url=auth_url, parent_conn=connection, **kwargs)
osa.request = Mock(wraps=osa.request)
osa = osa.authenticate()
self.assertEqual(len(auth_cache), 1)
mock_http_class.type = "UNAUTHORIZED"
try:
osa.list_projects()
except: # These methods don't handle 401s
pass
self.assertEqual(len(auth_cache), 0)
def _get_mock_connection(self, mock_http_class, auth_url=None):
OpenStackBaseConnection.conn_class = mock_http_class
if auth_url is None:
auth_url = "https://auth.api.example.com"
OpenStackBaseConnection.auth_url = auth_url
connection = OpenStackBaseConnection(*OPENSTACK_PARAMS)
connection._ex_force_base_url = "https://www.foo.com"
connection.driver = OpenStack_1_0_NodeDriver(*OPENSTACK_PARAMS)
return connection
class OpenStackIdentity_2_0_ConnectionTests(unittest.TestCase):
def setUp(self):
mock_cls = OpenStackIdentity_2_0_MockHttp
mock_cls.type = None
OpenStackIdentity_2_0_Connection.conn_class = mock_cls
self.auth_instance = OpenStackIdentity_2_0_Connection(
auth_url="http://none",
user_id="test",
key="test",
tenant_name="test",
proxy_url="http://proxy:8080",
timeout=10,
)
self.auth_instance.auth_token = "mock"
self.assertEqual(self.auth_instance.proxy_url, "http://proxy:8080")
def test_list_projects(self):
result = self.auth_instance.list_projects()
self.assertEqual(len(result), 2)
self.assertEqual(result[0].id, "a")
self.assertEqual(result[0].name, "test")
self.assertEqual(result[0].description, "test project")
self.assertTrue(result[0].enabled)
class OpenStackIdentity_3_0_ConnectionTests(unittest.TestCase):
def setUp(self):
mock_cls = OpenStackIdentity_3_0_MockHttp
mock_cls.type = None
OpenStackIdentity_3_0_Connection.conn_class = mock_cls
self.auth_instance = OpenStackIdentity_3_0_Connection(
auth_url="http://none",
user_id="test",
key="test",
tenant_name="test",
proxy_url="http://proxy:8080",
timeout=10,
)
self.auth_instance.auth_token = "mock"
self.assertEqual(self.auth_instance.proxy_url, "http://proxy:8080")
def test_token_scope_argument(self):
# Invalid token_scope value
expected_msg = 'Invalid value for "token_scope" argument: foo'
assertRaisesRegex(
self,
ValueError,
expected_msg,
OpenStackIdentity_3_0_Connection,
auth_url="http://none",
user_id="test",
key="test",
token_scope="foo",
)
# Missing tenant_name
expected_msg = "Must provide tenant_name and domain_name argument"
assertRaisesRegex(
self,
ValueError,
expected_msg,
OpenStackIdentity_3_0_Connection,
auth_url="http://none",
user_id="test",
key="test",
token_scope="project",
)
# Missing domain_name
expected_msg = "Must provide domain_name argument"
assertRaisesRegex(
self,
ValueError,
expected_msg,
OpenStackIdentity_3_0_Connection,
auth_url="http://none",
user_id="test",
key="test",
token_scope="domain",
domain_name=None,
)
# Scope to project all ok
OpenStackIdentity_3_0_Connection(
auth_url="http://none",
user_id="test",
key="test",
token_scope="project",
tenant_name="test",
domain_name="Default",
)
# Scope to domain
OpenStackIdentity_3_0_Connection(
auth_url="http://none",
user_id="test",
key="test",
token_scope="domain",
tenant_name=None,
domain_name="Default",
)
def test_authenticate(self):
auth = OpenStackIdentity_3_0_Connection(
auth_url="http://none",
user_id="test_user_id",
key="test_key",
token_scope="project",
tenant_name="test_tenant",
tenant_domain_id="test_tenant_domain_id",
domain_name="test_domain",
proxy_url="http://proxy:8080",
timeout=10,
)
auth.authenticate()
self.assertEqual(auth.proxy_url, "http://proxy:8080")
def test_list_supported_versions(self):
OpenStackIdentity_3_0_MockHttp.type = "v3"
versions = self.auth_instance.list_supported_versions()
self.assertEqual(len(versions), 2)
self.assertEqual(versions[0].version, "v2.0")
self.assertEqual(versions[0].url, "http://192.168.18.100:5000/v2.0/")
self.assertEqual(versions[1].version, "v3.0")
self.assertEqual(versions[1].url, "http://192.168.18.100:5000/v3/")
def test_list_domains(self):
domains = self.auth_instance.list_domains()
self.assertEqual(len(domains), 1)
self.assertEqual(domains[0].id, "default")
self.assertEqual(domains[0].name, "Default")
self.assertTrue(domains[0].enabled)
def test_list_projects(self):
projects = self.auth_instance.list_projects()
self.assertEqual(len(projects), 4)
self.assertEqual(projects[0].id, "a")
self.assertEqual(projects[0].domain_id, "default")
self.assertTrue(projects[0].enabled)
self.assertEqual(projects[0].description, "Test project")
def test_list_users(self):
users = self.auth_instance.list_users()
self.assertEqual(len(users), 12)
self.assertEqual(users[0].id, "a")
self.assertEqual(users[0].domain_id, "default")
self.assertEqual(users[0].enabled, True)
self.assertEqual(users[0].email, "openstack-test@localhost")
def test_list_roles(self):
roles = self.auth_instance.list_roles()
self.assertEqual(len(roles), 2)
self.assertEqual(roles[1].id, "b")
self.assertEqual(roles[1].name, "admin")
def test_list_user_projects(self):
user = self.auth_instance.list_users()[0]
projects = self.auth_instance.list_user_projects(user=user)
self.assertEqual(len(projects), 0)
def test_list_user_domain_roles(self):
user = self.auth_instance.list_users()[0]
domain = self.auth_instance.list_domains()[0]
roles = self.auth_instance.list_user_domain_roles(domain=domain, user=user)
self.assertEqual(len(roles), 1)
self.assertEqual(roles[0].name, "admin")
def test_get_domain(self):
domain = self.auth_instance.get_domain(domain_id="default")
self.assertEqual(domain.name, "Default")
def test_get_user(self):
user = self.auth_instance.get_user(user_id="a")
self.assertEqual(user.id, "a")
self.assertEqual(user.domain_id, "default")
self.assertEqual(user.enabled, True)
self.assertEqual(user.email, "openstack-test@localhost")
def test_get_user_without_email(self):
user = self.auth_instance.get_user(user_id="b")
self.assertEqual(user.id, "b")
self.assertEqual(user.name, "userwithoutemail")
self.assertIsNone(user.email)
def test_get_user_without_enabled(self):
user = self.auth_instance.get_user(user_id="c")
self.assertEqual(user.id, "c")
self.assertEqual(user.name, "userwithoutenabled")
self.assertIsNone(user.enabled)
def test_create_user(self):
user = self.auth_instance.create_user(
email="test2@localhost", password="test1", name="test2", domain_id="default"
)
self.assertEqual(user.id, "c")
self.assertEqual(user.name, "test2")
def test_enable_user(self):
user = self.auth_instance.list_users()[0]
result = self.auth_instance.enable_user(user=user)
self.assertTrue(isinstance(result, OpenStackIdentityUser))
def test_disable_user(self):
user = self.auth_instance.list_users()[0]
result = self.auth_instance.disable_user(user=user)
self.assertTrue(isinstance(result, OpenStackIdentityUser))
def test_grant_domain_role_to_user(self):
domain = self.auth_instance.list_domains()[0]
role = self.auth_instance.list_roles()[0]
user = self.auth_instance.list_users()[0]
result = self.auth_instance.grant_domain_role_to_user(domain=domain, role=role, user=user)
self.assertTrue(result)
def test_revoke_domain_role_from_user(self):
domain = self.auth_instance.list_domains()[0]
role = self.auth_instance.list_roles()[0]
user = self.auth_instance.list_users()[0]
result = self.auth_instance.revoke_domain_role_from_user(
domain=domain, role=role, user=user
)
self.assertTrue(result)
def test_grant_project_role_to_user(self):
project = self.auth_instance.list_projects()[0]
role = self.auth_instance.list_roles()[0]
user = self.auth_instance.list_users()[0]
result = self.auth_instance.grant_project_role_to_user(
project=project, role=role, user=user
)
self.assertTrue(result)
def test_revoke_project_role_from_user(self):
project = self.auth_instance.list_projects()[0]
role = self.auth_instance.list_roles()[0]
user = self.auth_instance.list_users()[0]
result = self.auth_instance.revoke_project_role_from_user(
project=project, role=role, user=user
)
self.assertTrue(result)
class OpenStackIdentity_3_0_Connection_AppCredTests(unittest.TestCase):
def setUp(self):
mock_cls = OpenStackIdentity_3_0_AppCred_MockHttp
mock_cls.type = None
OpenStackIdentity_3_0_Connection_AppCred.conn_class = mock_cls
self.auth_instance = OpenStackIdentity_3_0_Connection_AppCred(
auth_url="http://none",
user_id="appcred_id",
key="appcred_secret",
proxy_url="http://proxy:8080",
timeout=10,
)
self.auth_instance.auth_token = "mock"
def test_authenticate(self):
auth = OpenStackIdentity_3_0_Connection_AppCred(
auth_url="http://none",
user_id="appcred_id",
key="appcred_secret",
proxy_url="http://proxy:8080",
timeout=10,
)
auth.authenticate()
class OpenStackIdentity_3_0_Connection_OIDC_access_token_federation_projectsTests(
unittest.TestCase
):
def setUp(self):
mock_cls = OpenStackIdentity_3_0_federation_projects_MockHttp
mock_cls.type = None
OpenStackIdentity_3_0_Connection_OIDC_access_token.conn_class = mock_cls
self.auth_instance = OpenStackIdentity_3_0_Connection_OIDC_access_token(
auth_url="http://none",
user_id="idp",
key="token",
tenant_name="oidc",
proxy_url="http://proxy:8080",
timeout=10,
)
self.auth_instance.auth_token = "mock"
def test_authenticate(self):
auth = OpenStackIdentity_3_0_Connection_OIDC_access_token(
auth_url="http://none",
user_id="idp",
key="token",
token_scope="project",
tenant_name="oidc",
proxy_url="http://proxy:8080",
timeout=10,
)
auth.authenticate()
class OpenStackIdentity_3_0_Connection_OIDC_access_tokenTests(unittest.TestCase):
def setUp(self):
mock_cls = OpenStackIdentity_3_0_MockHttp
mock_cls.type = None
OpenStackIdentity_3_0_Connection_OIDC_access_token.conn_class = mock_cls
self.auth_instance = OpenStackIdentity_3_0_Connection_OIDC_access_token(
auth_url="http://none",
user_id="idp",
key="token",
tenant_name="oidc",
domain_name="project_name2",
)
self.auth_instance.auth_token = "mock"
def test_authenticate(self):
auth = OpenStackIdentity_3_0_Connection_OIDC_access_token(
auth_url="http://none",
user_id="idp",
key="token",
token_scope="project",
tenant_name="oidc",
domain_name="project_name2",
)
auth.authenticate()
class OpenStackIdentity_3_0_Connection_OIDC_access_token_project_idTests(unittest.TestCase):
def setUp(self):
mock_cls = OpenStackIdentity_3_0_MockHttp
mock_cls.type = None
OpenStackIdentity_3_0_Connection_OIDC_access_token.conn_class = mock_cls
self.auth_instance = OpenStackIdentity_3_0_Connection_OIDC_access_token(
auth_url="http://none",
user_id="idp",
key="token",
tenant_name="oidc",
domain_name="project_id2",
)
self.auth_instance.auth_token = "mock"
def test_authenticate_valid_project_id(self):
auth = OpenStackIdentity_3_0_Connection_OIDC_access_token(
auth_url="http://none",
user_id="idp",
key="token",
token_scope="project",
tenant_name="oidc",
domain_name="project_id2",
)
auth.authenticate()
def test_authenticate_invalid_project_id(self):
auth = OpenStackIdentity_3_0_Connection_OIDC_access_token(
auth_url="http://none",
user_id="idp",
key="token",
token_scope="project",
tenant_name="oidc",
domain_name="project_id100",
)
expected_msg = "Project project_id100 not found"
self.assertRaisesRegex(ValueError, expected_msg, auth.authenticate)
class OpenStackIdentity_2_0_Connection_VOMSTests(unittest.TestCase):
def setUp(self):
mock_cls = OpenStackIdentity_2_0_Connection_VOMSMockHttp
mock_cls.type = None
OpenStackIdentity_2_0_Connection_VOMS.conn_class = mock_cls
self.auth_instance = OpenStackIdentity_2_0_Connection_VOMS(
auth_url="http://none", user_id=None, key="/tmp/proxy.pem", tenant_name="VO"
)
self.auth_instance.auth_token = "mock"
def test_authenticate(self):
auth = OpenStackIdentity_2_0_Connection_VOMS(
auth_url="http://none",
user_id=None,
key="/tmp/proxy.pem",
token_scope="test",
tenant_name="VO",
)
auth.authenticate()
class OpenStackServiceCatalogTestCase(unittest.TestCase):
fixtures = ComputeFileFixtures("openstack")
def test_parsing_auth_v1_1(self):
data = self.fixtures.load("_v1_1__auth.json")
data = json.loads(data)
service_catalog = data["auth"]["serviceCatalog"]
catalog = OpenStackServiceCatalog(service_catalog=service_catalog, auth_version="1.0")
entries = catalog.get_entries()
self.assertEqual(len(entries), 3)
entry = [e for e in entries if e.service_type == "cloudFilesCDN"][0]
self.assertEqual(entry.service_type, "cloudFilesCDN")
self.assertIsNone(entry.service_name)
self.assertEqual(len(entry.endpoints), 2)
self.assertEqual(entry.endpoints[0].region, "ORD")
self.assertEqual(entry.endpoints[0].url, "https://cdn2.clouddrive.com/v1/MossoCloudFS")
self.assertEqual(entry.endpoints[0].endpoint_type, "external")
self.assertEqual(entry.endpoints[1].region, "LON")
self.assertEqual(entry.endpoints[1].endpoint_type, "external")
def test_parsing_auth_v2(self):
data = self.fixtures.load("_v2_0__auth.json")
data = json.loads(data)
service_catalog = data["access"]["serviceCatalog"]
catalog = OpenStackServiceCatalog(service_catalog=service_catalog, auth_version="2.0")
entries = catalog.get_entries()
self.assertEqual(len(entries), 10)
entry = [e for e in entries if e.service_name == "cloudServers"][0]
self.assertEqual(entry.service_type, "compute")
self.assertEqual(entry.service_name, "cloudServers")
self.assertEqual(len(entry.endpoints), 1)
self.assertIsNone(entry.endpoints[0].region)
self.assertEqual(entry.endpoints[0].url, "https://servers.api.rackspacecloud.com/v1.0/1337")
self.assertEqual(entry.endpoints[0].endpoint_type, "external")
def test_parsing_auth_v3(self):
data = self.fixtures.load("_v3__auth.json")
data = json.loads(data)
service_catalog = data["token"]["catalog"]
catalog = OpenStackServiceCatalog(service_catalog=service_catalog, auth_version="3.x")
entries = catalog.get_entries()
self.assertEqual(len(entries), 6)
entry = [e for e in entries if e.service_type == "volume"][0]
self.assertEqual(entry.service_type, "volume")
self.assertIsNone(entry.service_name)
self.assertEqual(len(entry.endpoints), 3)
self.assertEqual(entry.endpoints[0].region, "regionOne")
self.assertEqual(entry.endpoints[0].endpoint_type, "external")
self.assertEqual(entry.endpoints[1].region, "regionOne")
self.assertEqual(entry.endpoints[1].endpoint_type, "admin")
self.assertEqual(entry.endpoints[2].region, "regionOne")
self.assertEqual(entry.endpoints[2].endpoint_type, "internal")
def test_get_public_urls(self):
data = self.fixtures.load("_v2_0__auth.json")
data = json.loads(data)
service_catalog = data["access"]["serviceCatalog"]
catalog = OpenStackServiceCatalog(service_catalog=service_catalog, auth_version="2.0")
public_urls = catalog.get_public_urls(service_type="object-store")
expected_urls = [
"https://storage101.lon1.clouddrive.com/v1/MossoCloudFS_11111-111111111-1111111111-1111111",
"https://storage101.ord1.clouddrive.com/v1/MossoCloudFS_11111-111111111-1111111111-1111111",
]
self.assertEqual(public_urls, expected_urls)
def test_get_regions(self):
data = self.fixtures.load("_v2_0__auth.json")
data = json.loads(data)
service_catalog = data["access"]["serviceCatalog"]
catalog = OpenStackServiceCatalog(service_catalog=service_catalog, auth_version="2.0")
regions = catalog.get_regions(service_type="object-store")
self.assertEqual(regions, ["LON", "ORD"])
regions = catalog.get_regions(service_type="invalid")
self.assertEqual(regions, [])
def test_get_service_types(self):
data = self.fixtures.load("_v2_0__auth.json")
data = json.loads(data)
service_catalog = data["access"]["serviceCatalog"]
catalog = OpenStackServiceCatalog(service_catalog=service_catalog, auth_version="2.0")
service_types = catalog.get_service_types()
self.assertEqual(
service_types,
[
"compute",
"image",
"network",
"object-store",
"rax:object-cdn",
"volumev2",
"volumev3",
],
)
service_types = catalog.get_service_types(region="ORD")
self.assertEqual(service_types, ["rax:object-cdn"])
def test_get_service_names(self):
data = self.fixtures.load("_v2_0__auth.json")
data = json.loads(data)
service_catalog = data["access"]["serviceCatalog"]
catalog = OpenStackServiceCatalog(service_catalog=service_catalog, auth_version="2.0")
service_names = catalog.get_service_names()
self.assertEqual(
service_names,
[
"cinderv2",
"cinderv3",
"cloudFiles",
"cloudFilesCDN",
"cloudServers",
"cloudServersOpenStack",
"cloudServersPreprod",
"glance",
"neutron",
"nova",
],
)
service_names = catalog.get_service_names(service_type="compute")
self.assertEqual(
service_names,
["cloudServers", "cloudServersOpenStack", "cloudServersPreprod", "nova"],
)
class OpenStackIdentity_2_0_MockHttp(MockHttp):
fixtures = ComputeFileFixtures("openstack_identity/v2")
json_content_headers = {"content-type": "application/json; charset=UTF-8"}
def _v2_0_tenants(self, method, url, body, headers):
if method == "GET":
body = self.fixtures.load("v2_0_tenants.json")
return (
httplib.OK,
body,
self.json_content_headers,
httplib.responses[httplib.OK],
)
raise NotImplementedError()
class OpenStackIdentity_3_0_MockHttp(MockHttp):
fixtures = ComputeFileFixtures("openstack_identity/v3")
json_content_headers = {"content-type": "application/json; charset=UTF-8"}
def _v3(self, method, url, body, headers):
if method == "GET":
body = self.fixtures.load("v3_versions.json")
return (
httplib.OK,
body,
self.json_content_headers,
httplib.responses[httplib.OK],
)
raise NotImplementedError()
def _v3_domains(self, method, url, body, headers):
if method == "GET":
body = self.fixtures.load("v3_domains.json")
return (
httplib.OK,
body,
self.json_content_headers,
httplib.responses[httplib.OK],
)
raise NotImplementedError()
def _v3_projects(self, method, url, body, headers):
if method == "GET":
body = self.fixtures.load("v3_projects.json")
return (
httplib.OK,
body,
self.json_content_headers,
httplib.responses[httplib.OK],
)
raise NotImplementedError()
def _v3_projects_UNAUTHORIZED(self, method, url, body, headers):
if method == "GET":
body = ComputeFileFixtures("openstack").load("_v3__auth.json")
return (
httplib.UNAUTHORIZED,
body,
self.json_content_headers,
httplib.responses[httplib.UNAUTHORIZED],
)
raise NotImplementedError()
def _v3_OS_FEDERATION_identity_providers_test_user_id_protocols_test_tenant_auth(
self, method, url, body, headers
):
if method == "GET":
if "Authorization" not in headers:
return (
httplib.UNAUTHORIZED,
"",
headers,
httplib.responses[httplib.OK],
)
if headers["Authorization"] == "Bearer test_key":
response_body = ComputeFileFixtures("openstack").load("_v3__auth.json")
response_headers = {
"Content-Type": "application/json",
"x-subject-token": "foo-bar",
}
return (
httplib.OK,
response_body,
response_headers,
httplib.responses[httplib.OK],
)
return (httplib.UNAUTHORIZED, "{}", headers, httplib.responses[httplib.OK])
raise NotImplementedError()
def _v3_auth_tokens(self, method, url, body, headers):
if method == "GET":
body = json.loads(ComputeFileFixtures("openstack").load("_v3__auth.json"))
body["token"]["expires_at"] = TOMORROW.isoformat()
headers = self.json_content_headers.copy()
headers["x-subject-token"] = "00000000000000000000000000000000"
return (
httplib.OK,
json.dumps(body),
headers,
httplib.responses[httplib.OK],
)
if method == "POST":
status = httplib.OK
data = json.loads(body)
if "password" in data["auth"]["identity"]:
if (
data["auth"]["identity"]["password"]["user"]["domain"]["name"] != "test_domain"
or data["auth"]["scope"]["project"]["domain"]["id"] != "test_tenant_domain_id"
):
status = httplib.UNAUTHORIZED
body = ComputeFileFixtures("openstack").load("_v3__auth.json")
headers = self.json_content_headers.copy()
headers["x-subject-token"] = "00000000000000000000000000000000"
return (status, body, headers, httplib.responses[httplib.OK])
raise NotImplementedError()
def _v3_auth_tokens_GET_UNAUTHORIZED_POST_OK(self, method, url, body, headers):
if method == "GET":
body = ComputeFileFixtures("openstack").load("_v3__auth_unauthorized.json")
return (
httplib.UNAUTHORIZED,
body,
self.json_content_headers,
httplib.responses[httplib.UNAUTHORIZED],
)
if method == "POST":
return self._v3_auth_tokens(method, url, body, headers)
raise NotImplementedError()
def _v3_users(self, method, url, body, headers):
if method == "GET":
# list users
body = self.fixtures.load("v3_users.json")
return (
httplib.OK,
body,
self.json_content_headers,
httplib.responses[httplib.OK],
)
elif method == "POST":
# create user
body = self.fixtures.load("v3_create_user.json")
return (
httplib.CREATED,
body,
self.json_content_headers,
httplib.responses[httplib.CREATED],
)
raise NotImplementedError()
def _v3_users_a(self, method, url, body, headers):
if method == "GET":
# look up a user
body = self.fixtures.load("v3_users_a.json")
return (
httplib.OK,
body,
self.json_content_headers,
httplib.responses[httplib.OK],
)
if method == "PATCH":
# enable / disable user
body = self.fixtures.load("v3_users_a.json")
return (
httplib.OK,
body,
self.json_content_headers,
httplib.responses[httplib.OK],
)
raise NotImplementedError()
def _v3_users_b(self, method, url, body, headers):
if method == "GET":
# look up a user
body = self.fixtures.load("v3_users_b.json")
return (
httplib.OK,
body,
self.json_content_headers,
httplib.responses[httplib.OK],
)
raise NotImplementedError()
def _v3_users_c(self, method, url, body, headers):
if method == "GET":
# look up a user
body = self.fixtures.load("v3_users_c.json")
return (
httplib.OK,
body,
self.json_content_headers,
httplib.responses[httplib.OK],
)
raise NotImplementedError()
def _v3_roles(self, method, url, body, headers):
if method == "GET":
body = self.fixtures.load("v3_roles.json")
return (
httplib.OK,
body,
self.json_content_headers,
httplib.responses[httplib.OK],
)
raise NotImplementedError()
def _v3_domains_default_users_a_roles_a(self, method, url, body, headers):
if method == "PUT":
# grant domain role
body = ""
return (
httplib.NO_CONTENT,
body,
self.json_content_headers,
httplib.responses[httplib.NO_CONTENT],
)
elif method == "DELETE":
# revoke domain role
body = ""
return (
httplib.NO_CONTENT,
body,
self.json_content_headers,
httplib.responses[httplib.NO_CONTENT],
)
raise NotImplementedError()
def _v3_projects_a_users_a_roles_a(self, method, url, body, headers):
if method == "PUT":
# grant project role
body = ""
return (
httplib.NO_CONTENT,
body,
self.json_content_headers,
httplib.responses[httplib.NO_CONTENT],
)
elif method == "DELETE":
# revoke project role
body = ""
return (
httplib.NO_CONTENT,
body,
self.json_content_headers,
httplib.responses[httplib.NO_CONTENT],
)
raise NotImplementedError()
def _v3_domains_default(self, method, url, body, headers):
if method == "GET":
# get domain
body = self.fixtures.load("v3_domains_default.json")
return (
httplib.OK,
body,
self.json_content_headers,
httplib.responses[httplib.OK],
)
raise NotImplementedError()
def _v3_users_a_projects(self, method, url, body, headers):
if method == "GET":
# get user projects
body = self.fixtures.load("v3_users_a_projects.json")
return (
httplib.OK,
body,
self.json_content_headers,
httplib.responses[httplib.OK],
)
raise NotImplementedError()
def _v3_domains_default_users_a_roles(self, method, url, body, headers):
if method == "GET":
# get user domain roles
body = self.fixtures.load("v3_domains_default_users_a_roles.json")
return (
httplib.OK,
body,
self.json_content_headers,
httplib.responses[httplib.OK],
)
raise NotImplementedError()
def _v3_OS_FEDERATION_identity_providers_idp_protocols_oidc_auth(
self, method, url, body, headers
):
if method == "GET":
headers = self.json_content_headers.copy()
headers["x-subject-token"] = "00000000000000000000000000000000"
return (httplib.OK, body, headers, httplib.responses[httplib.OK])
raise NotImplementedError()
def _v3_OS_FEDERATION_projects(self, method, url, body, headers):
if method == "GET":
# get user projects
body = json.dumps(
{
"projects": [
{"id": "project_id", "name": "project_name"},
{"id": "project_id2", "name": "project_name2"},
]
}
)
return (
httplib.OK,
body,
self.json_content_headers,
httplib.responses[httplib.OK],
)
raise NotImplementedError()
def _v3_auth_projects(self, method, url, body, headers):
if method == "GET":
# get user projects
body = json.dumps(
{
"projects": [
{"id": "project_id", "name": "project_name"},
{"id": "project_id2", "name": "project_name2"},
]
}
)
return (
httplib.OK,
body,
self.json_content_headers,
httplib.responses[httplib.OK],
)
raise NotImplementedError()
class OpenStackIdentity_3_0_AppCred_MockHttp(OpenStackIdentity_3_0_MockHttp):
def _v3_auth_tokens(self, method, url, body, headers):
if method == "POST":
status = httplib.OK
data = json.loads(body)
if "application_credential" not in data["auth"]["identity"]["methods"]:
status = httplib.UNAUTHORIZED
else:
appcred = data["auth"]["identity"]["application_credential"]
if appcred["id"] != "appcred_id" or appcred["secret"] != "appcred_secret":
status = httplib.UNAUTHORIZED
body = ComputeFileFixtures("openstack").load("_v3__auth.json")
headers = self.json_content_headers.copy()
headers["x-subject-token"] = "00000000000000000000000000000000"
return (status, body, headers, httplib.responses[httplib.OK])
raise NotImplementedError()
class OpenStackIdentity_3_0_federation_projects_MockHttp(OpenStackIdentity_3_0_MockHttp):
fixtures = ComputeFileFixtures("openstack_identity/v3")
json_content_headers = {"content-type": "application/json; charset=UTF-8"}
def _v3_OS_FEDERATION_projects(self, method, url, body, headers):
if method == "GET":
# get user projects
body = json.dumps({"projects": [{"id": "project_id"}]})
return (
httplib.OK,
body,
self.json_content_headers,
httplib.responses[httplib.OK],
)
raise NotImplementedError()
def _v3_auth_projects(self, method, url, body, headers):
return (
httplib.INTERNAL_SERVER_ERROR,
body,
self.json_content_headers,
httplib.responses[httplib.INTERNAL_SERVER_ERROR],
)
class OpenStackIdentity_2_0_Connection_VOMSMockHttp(MockHttp):
fixtures = ComputeFileFixtures("openstack_identity/v2")
json_content_headers = {"content-type": "application/json; charset=UTF-8"}
def _v2_0_tokens(self, method, url, body, headers):
if method == "POST":
status = httplib.UNAUTHORIZED
data = json.loads(body)
if "voms" in data["auth"] and data["auth"]["voms"] is True:
status = httplib.OK
body = ComputeFileFixtures("openstack").load("_v2_0__auth.json")
headers = self.json_content_headers.copy()
headers["x-subject-token"] = "00000000000000000000000000000000"
return (status, body, headers, httplib.responses[httplib.OK])
raise NotImplementedError()
def _v2_0_tenants(self, method, url, body, headers):
if method == "GET":
# get user projects
body = json.dumps({"tenant": [{"name": "tenant_name"}]})
return (
httplib.OK,
body,
self.json_content_headers,
httplib.responses[httplib.OK],
)
raise NotImplementedError()
if __name__ == "__main__":
sys.exit(unittest.main())