blob: e924daa97e26b1eae0f06d73e940874f3331a0a6 [file] [log] [blame]
# Copyright 2021 The casbin Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from functools import partial
from unittest import IsolatedAsyncioTestCase
import casbin
from tests.test_enforcer import get_examples, TestCaseBase
class TestManagementApi(TestCaseBase):
def get_enforcer(self, model=None, adapter=None):
return casbin.Enforcer(
model,
adapter,
)
def test_get_list(self):
e = self.get_enforcer(
get_examples("rbac_model.conf"),
get_examples("rbac_policy.csv"),
# True,
)
self.assertEqual(e.get_all_subjects(), ["alice", "bob", "data2_admin"])
self.assertEqual(e.get_all_objects(), ["data1", "data2"])
self.assertEqual(e.get_all_actions(), ["read", "write"])
self.assertEqual(e.get_all_roles(), ["data2_admin"])
def test_get_list_with_domains(self):
e = self.get_enforcer(
get_examples("rbac_with_domains_model.conf"),
get_examples("rbac_with_domains_policy.csv"),
# True,
)
self.assertEqual(e.get_all_subjects(), ["admin"])
self.assertEqual(e.get_all_objects(), ["data1", "data2"])
self.assertEqual(e.get_all_actions(), ["read", "write"])
self.assertEqual(e.get_all_roles(), ["admin"])
def test_get_policy_api(self):
e = self.get_enforcer(
get_examples("rbac_model.conf"),
get_examples("rbac_policy.csv"),
)
self.assertEqual(
e.get_policy(),
[
["alice", "data1", "read"],
["bob", "data2", "write"],
["data2_admin", "data2", "read"],
["data2_admin", "data2", "write"],
],
)
self.assertEqual(e.get_filtered_policy(0, "alice"), [["alice", "data1", "read"]])
self.assertEqual(e.get_filtered_policy(0, "bob"), [["bob", "data2", "write"]])
self.assertEqual(
e.get_filtered_policy(0, "data2_admin"),
[["data2_admin", "data2", "read"], ["data2_admin", "data2", "write"]],
)
self.assertEqual(e.get_filtered_policy(1, "data1"), [["alice", "data1", "read"]])
self.assertEqual(
e.get_filtered_policy(1, "data2"),
[
["bob", "data2", "write"],
["data2_admin", "data2", "read"],
["data2_admin", "data2", "write"],
],
)
self.assertEqual(
e.get_filtered_policy(2, "read"),
[["alice", "data1", "read"], ["data2_admin", "data2", "read"]],
)
self.assertEqual(
e.get_filtered_policy(2, "write"),
[["bob", "data2", "write"], ["data2_admin", "data2", "write"]],
)
self.assertEqual(
e.get_filtered_policy(0, "data2_admin", "data2"),
[["data2_admin", "data2", "read"], ["data2_admin", "data2", "write"]],
)
# Note: "" (empty string) in fieldValues means matching all values.
self.assertEqual(
e.get_filtered_policy(0, "data2_admin", "", "read"),
[["data2_admin", "data2", "read"]],
)
self.assertEqual(
e.get_filtered_policy(1, "data2", "write"),
[["bob", "data2", "write"], ["data2_admin", "data2", "write"]],
)
self.assertTrue(e.has_policy(["alice", "data1", "read"]))
self.assertTrue(e.has_policy(["bob", "data2", "write"]))
self.assertFalse(e.has_policy(["alice", "data2", "read"]))
self.assertFalse(e.has_policy(["bob", "data3", "write"]))
self.assertEqual(e.get_grouping_policy(), [["alice", "data2_admin"]])
self.assertEqual(e.get_filtered_grouping_policy(0, "alice"), [["alice", "data2_admin"]])
self.assertEqual(e.get_filtered_grouping_policy(0, "bob"), [])
self.assertEqual(e.get_filtered_grouping_policy(1, "data1_admin"), [])
self.assertEqual(e.get_filtered_grouping_policy(1, "data2_admin"), [["alice", "data2_admin"]])
# Note: "" (empty string) in fieldValues means matching all values.
self.assertEqual(
e.get_filtered_grouping_policy(0, "", "data2_admin"),
[["alice", "data2_admin"]],
)
self.assertTrue(e.has_grouping_policy(["alice", "data2_admin"]))
self.assertFalse(e.has_grouping_policy(["bob", "data2_admin"]))
def test_update_filtered_policies(self):
e = casbin.Enforcer(
get_examples("rbac_model.conf"),
get_examples("rbac_policy.csv"),
)
e.update_filtered_policies(
[
["data2_admin", "data3", "read"],
["data2_admin", "data3", "write"],
],
0,
"data2_admin",
)
self.assertTrue(e.enforce("data2_admin", "data3", "write"))
self.assertTrue(e.enforce("data2_admin", "data3", "read"))
def test_get_policy_matching_function(self):
e = self.get_enforcer(
get_examples("rbac_with_domain_and_policy_pattern_model.conf"),
get_examples("rbac_with_domain_and_policy_pattern_policy.csv"),
)
self.assertEqual(
e.get_policy(),
[
["admin", "domain.*", "data1", "read"],
["user", "domain.*", "data3", "read"],
["user", "domain.1", "data2", "read"],
["user", "domain.1", "data2", "write"],
],
)
km2_fn = casbin.util.key_match2_func
self.assertEqual(
e.get_filtered_grouping_policy(2, partial(km2_fn, "domain.3")),
[["alice", "user", "*"], ["bob", "admin", "domain.3"]],
)
self.assertEqual(
e.get_filtered_grouping_policy(2, partial(km2_fn, "domain.1")),
[["alice", "user", "*"]],
)
# first and second p record matches to domain.3
self.assertEqual(
e.get_filtered_policy(1, partial(km2_fn, "domain.3")),
[
["admin", "domain.*", "data1", "read"],
["user", "domain.*", "data3", "read"],
],
)
self.assertEqual(
sorted(e.get_filtered_policy(1, partial(km2_fn, "domain.1"), "", "read")),
sorted(
[
["admin", "domain.*", "data1", "read"],
["user", "domain.1", "data2", "read"],
["user", "domain.*", "data3", "read"],
]
),
)
def test_get_policy_multiple_matching_functions(self):
e = self.get_enforcer(
get_examples("rbac_with_domain_and_policy_pattern_model.conf"),
get_examples("rbac_with_domain_and_policy_pattern_policy.csv"),
)
self.assertEqual(
e.get_policy(),
[
["admin", "domain.*", "data1", "read"],
["user", "domain.*", "data3", "read"],
["user", "domain.1", "data2", "read"],
["user", "domain.1", "data2", "write"],
],
)
km2_fn = casbin.util.key_match2_func
self.assertEqual(
sorted(e.get_filtered_policy(1, partial(km2_fn, "domain.2"), lambda a: "data" in a)),
sorted(
[
["admin", "domain.*", "data1", "read"],
["user", "domain.*", "data3", "read"],
]
),
)
self.assertEqual(
sorted(e.get_filtered_policy(1, partial(km2_fn, "domain.1"), lambda a: "data" in a, "read")),
sorted(
[
["admin", "domain.*", "data1", "read"],
["user", "domain.1", "data2", "read"],
["user", "domain.*", "data3", "read"],
]
),
)
self.assertEqual(
sorted(e.get_filtered_policy(1, partial(km2_fn, "domain.1"), "", "reading".startswith)),
sorted(
[
["admin", "domain.*", "data1", "read"],
["user", "domain.1", "data2", "read"],
["user", "domain.*", "data3", "read"],
]
),
)
def test_modify_policy_api(self):
e = self.get_enforcer(
get_examples("rbac_model.conf"),
get_examples("rbac_policy.csv"),
# True,
)
self.assertEqual(
e.get_policy(),
[
["alice", "data1", "read"],
["bob", "data2", "write"],
["data2_admin", "data2", "read"],
["data2_admin", "data2", "write"],
],
)
e.add_policy("eve", "data3", "read")
e.add_named_policy("p", ["eve", "data3", "write"])
self.assertEqual(
e.get_policy(),
[
["alice", "data1", "read"],
["bob", "data2", "write"],
["data2_admin", "data2", "read"],
["data2_admin", "data2", "write"],
["eve", "data3", "read"],
["eve", "data3", "write"],
],
)
rules = [
["jack", "data4", "read"],
["katy", "data4", "write"],
["leyo", "data4", "read"],
["ham", "data4", "write"],
]
named_policies = [
["jack", "data4", "write"],
["katy", "data4", "read"],
["leyo", "data4", "write"],
["ham", "data4", "read"],
]
e.add_policies(rules)
e.add_named_policies("p", named_policies)
self.assertEqual(
e.get_policy(),
[
["alice", "data1", "read"],
["bob", "data2", "write"],
["data2_admin", "data2", "read"],
["data2_admin", "data2", "write"],
["eve", "data3", "read"],
["eve", "data3", "write"],
["jack", "data4", "read"],
["katy", "data4", "write"],
["leyo", "data4", "read"],
["ham", "data4", "write"],
["jack", "data4", "write"],
["katy", "data4", "read"],
["leyo", "data4", "write"],
["ham", "data4", "read"],
],
)
e.remove_policies(rules)
e.remove_named_policies("p", named_policies)
e.add_named_policy("p", "testing")
self.assertEqual(
e.get_policy(),
[
["alice", "data1", "read"],
["bob", "data2", "write"],
["data2_admin", "data2", "read"],
["data2_admin", "data2", "write"],
["eve", "data3", "read"],
["eve", "data3", "write"],
["testing"],
],
)
class TestManagementApiSynced(TestManagementApi):
def get_enforcer(self, model=None, adapter=None):
return casbin.SyncedEnforcer(
model,
adapter,
)
class TestManagementApiAsync(IsolatedAsyncioTestCase):
def get_enforcer(self, model=None, adapter=None):
return casbin.AsyncEnforcer(
model,
adapter,
)
async def test_get_list(self):
e = self.get_enforcer(
get_examples("rbac_model.conf"),
get_examples("rbac_policy.csv"),
# True,
)
await e.load_policy()
self.assertEqual(e.get_all_subjects(), ["alice", "bob", "data2_admin"])
self.assertEqual(e.get_all_objects(), ["data1", "data2"])
self.assertEqual(e.get_all_actions(), ["read", "write"])
self.assertEqual(e.get_all_roles(), ["data2_admin"])
async def test_get_policy_api(self):
e = self.get_enforcer(
get_examples("rbac_model.conf"),
get_examples("rbac_policy.csv"),
)
await e.load_policy()
self.assertEqual(
e.get_policy(),
[
["alice", "data1", "read"],
["bob", "data2", "write"],
["data2_admin", "data2", "read"],
["data2_admin", "data2", "write"],
],
)
self.assertEqual(e.get_filtered_policy(0, "alice"), [["alice", "data1", "read"]])
self.assertEqual(e.get_filtered_policy(0, "bob"), [["bob", "data2", "write"]])
self.assertEqual(
e.get_filtered_policy(0, "data2_admin"),
[["data2_admin", "data2", "read"], ["data2_admin", "data2", "write"]],
)
self.assertEqual(e.get_filtered_policy(1, "data1"), [["alice", "data1", "read"]])
self.assertEqual(
e.get_filtered_policy(1, "data2"),
[
["bob", "data2", "write"],
["data2_admin", "data2", "read"],
["data2_admin", "data2", "write"],
],
)
self.assertEqual(
e.get_filtered_policy(2, "read"),
[["alice", "data1", "read"], ["data2_admin", "data2", "read"]],
)
self.assertEqual(
e.get_filtered_policy(2, "write"),
[["bob", "data2", "write"], ["data2_admin", "data2", "write"]],
)
self.assertEqual(
e.get_filtered_policy(0, "data2_admin", "data2"),
[["data2_admin", "data2", "read"], ["data2_admin", "data2", "write"]],
)
# Note: "" (empty string) in fieldValues means matching all values.
self.assertEqual(
e.get_filtered_policy(0, "data2_admin", "", "read"),
[["data2_admin", "data2", "read"]],
)
self.assertEqual(
e.get_filtered_policy(1, "data2", "write"),
[["bob", "data2", "write"], ["data2_admin", "data2", "write"]],
)
self.assertTrue(e.has_policy(["alice", "data1", "read"]))
self.assertTrue(e.has_policy(["bob", "data2", "write"]))
self.assertFalse(e.has_policy(["alice", "data2", "read"]))
self.assertFalse(e.has_policy(["bob", "data3", "write"]))
self.assertEqual(e.get_grouping_policy(), [["alice", "data2_admin"]])
self.assertEqual(e.get_filtered_grouping_policy(0, "alice"), [["alice", "data2_admin"]])
self.assertEqual(e.get_filtered_grouping_policy(0, "bob"), [])
self.assertEqual(e.get_filtered_grouping_policy(1, "data1_admin"), [])
self.assertEqual(e.get_filtered_grouping_policy(1, "data2_admin"), [["alice", "data2_admin"]])
# Note: "" (empty string) in fieldValues means matching all values.
self.assertEqual(
e.get_filtered_grouping_policy(0, "", "data2_admin"),
[["alice", "data2_admin"]],
)
self.assertTrue(e.has_grouping_policy(["alice", "data2_admin"]))
self.assertFalse(e.has_grouping_policy(["bob", "data2_admin"]))
async def test_update_filtered_policies(self):
e = self.get_enforcer(
get_examples("rbac_model.conf"),
get_examples("rbac_policy.csv"),
)
await e.load_policy()
await e.update_filtered_policies(
[
["data2_admin", "data3", "read"],
["data2_admin", "data3", "write"],
],
0,
"data2_admin",
)
self.assertTrue(e.enforce("data2_admin", "data3", "write"))
self.assertTrue(e.enforce("data2_admin", "data3", "read"))
async def test_get_policy_matching_function(self):
e = self.get_enforcer(
get_examples("rbac_with_domain_and_policy_pattern_model.conf"),
get_examples("rbac_with_domain_and_policy_pattern_policy.csv"),
)
await e.load_policy()
self.assertEqual(
e.get_policy(),
[
["admin", "domain.*", "data1", "read"],
["user", "domain.*", "data3", "read"],
["user", "domain.1", "data2", "read"],
["user", "domain.1", "data2", "write"],
],
)
km2_fn = casbin.util.key_match2_func
self.assertEqual(
e.get_filtered_grouping_policy(2, partial(km2_fn, "domain.3")),
[["alice", "user", "*"], ["bob", "admin", "domain.3"]],
)
self.assertEqual(
e.get_filtered_grouping_policy(2, partial(km2_fn, "domain.1")),
[["alice", "user", "*"]],
)
# first and second p record matches to domain.3
self.assertEqual(
e.get_filtered_policy(1, partial(km2_fn, "domain.3")),
[
["admin", "domain.*", "data1", "read"],
["user", "domain.*", "data3", "read"],
],
)
self.assertEqual(
sorted(e.get_filtered_policy(1, partial(km2_fn, "domain.1"), "", "read")),
sorted(
[
["admin", "domain.*", "data1", "read"],
["user", "domain.1", "data2", "read"],
["user", "domain.*", "data3", "read"],
]
),
)
async def test_get_policy_multiple_matching_functions(self):
e = self.get_enforcer(
get_examples("rbac_with_domain_and_policy_pattern_model.conf"),
get_examples("rbac_with_domain_and_policy_pattern_policy.csv"),
)
await e.load_policy()
self.assertEqual(
e.get_policy(),
[
["admin", "domain.*", "data1", "read"],
["user", "domain.*", "data3", "read"],
["user", "domain.1", "data2", "read"],
["user", "domain.1", "data2", "write"],
],
)
km2_fn = casbin.util.key_match2_func
self.assertEqual(
sorted(e.get_filtered_policy(1, partial(km2_fn, "domain.2"), lambda a: "data" in a)),
sorted(
[
["admin", "domain.*", "data1", "read"],
["user", "domain.*", "data3", "read"],
]
),
)
self.assertEqual(
sorted(e.get_filtered_policy(1, partial(km2_fn, "domain.1"), lambda a: "data" in a, "read")),
sorted(
[
["admin", "domain.*", "data1", "read"],
["user", "domain.1", "data2", "read"],
["user", "domain.*", "data3", "read"],
]
),
)
self.assertEqual(
sorted(e.get_filtered_policy(1, partial(km2_fn, "domain.1"), "", "reading".startswith)),
sorted(
[
["admin", "domain.*", "data1", "read"],
["user", "domain.1", "data2", "read"],
["user", "domain.*", "data3", "read"],
]
),
)
async def test_modify_policy_api(self):
e = self.get_enforcer(
get_examples("rbac_model.conf"),
get_examples("rbac_policy.csv"),
# True,
)
await e.load_policy()
self.assertEqual(
e.get_policy(),
[
["alice", "data1", "read"],
["bob", "data2", "write"],
["data2_admin", "data2", "read"],
["data2_admin", "data2", "write"],
],
)
await e.add_policy("eve", "data3", "read")
await e.add_named_policy("p", ["eve", "data3", "write"])
self.assertEqual(
e.get_policy(),
[
["alice", "data1", "read"],
["bob", "data2", "write"],
["data2_admin", "data2", "read"],
["data2_admin", "data2", "write"],
["eve", "data3", "read"],
["eve", "data3", "write"],
],
)
rules = [
["jack", "data4", "read"],
["katy", "data4", "write"],
["leyo", "data4", "read"],
["ham", "data4", "write"],
]
named_policies = [
["jack", "data4", "write"],
["katy", "data4", "read"],
["leyo", "data4", "write"],
["ham", "data4", "read"],
]
await e.add_policies(rules)
await e.add_named_policies("p", named_policies)
self.assertEqual(
e.get_policy(),
[
["alice", "data1", "read"],
["bob", "data2", "write"],
["data2_admin", "data2", "read"],
["data2_admin", "data2", "write"],
["eve", "data3", "read"],
["eve", "data3", "write"],
["jack", "data4", "read"],
["katy", "data4", "write"],
["leyo", "data4", "read"],
["ham", "data4", "write"],
["jack", "data4", "write"],
["katy", "data4", "read"],
["leyo", "data4", "write"],
["ham", "data4", "read"],
],
)
await e.remove_policies(rules)
await e.remove_named_policies("p", named_policies)
await e.add_named_policy("p", "testing")
self.assertEqual(
e.get_policy(),
[
["alice", "data1", "read"],
["bob", "data2", "write"],
["data2_admin", "data2", "read"],
["data2_admin", "data2", "write"],
["eve", "data3", "read"],
["eve", "data3", "write"],
["testing"],
],
)