blob: 11af3864fde20b959e15463c3cf2f0c6994969d7 [file] [log] [blame]
# Copyright 2021 The casbin Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
from unittest import TestCase, IsolatedAsyncioTestCase
import casbin
from casbin import util
def get_examples(path):
examples_path = os.path.split(os.path.realpath(__file__))[0] + "/../examples/"
return os.path.abspath(examples_path + path)
class MockSub:
def __init__(self, name, age):
self.name = name
self.age = age
class TestCaseBase(TestCase):
def get_enforcer(self, model=None, adapter=None):
return casbin.Enforcer(
model,
adapter,
)
class TestConfig(TestCaseBase):
def test_enforcer_basic(self):
e = self.get_enforcer(
get_examples("basic_model.conf"),
get_examples("basic_policy.csv"),
)
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertFalse(e.enforce("alice", "data2", "read"))
self.assertTrue(e.enforce("bob", "data2", "write"))
self.assertFalse(e.enforce("bob", "data1", "write"))
def test_enforce_ex_basic(self):
e = self.get_enforcer(
get_examples("basic_model.conf"),
get_examples("basic_policy.csv"),
)
self.assertTupleEqual(e.enforce_ex("alice", "data1", "read"), (True, ["alice", "data1", "read"]))
self.assertTupleEqual(e.enforce_ex("alice", "data2", "read"), (False, []))
self.assertTupleEqual(e.enforce_ex("bob", "data2", "write"), (True, ["bob", "data2", "write"]))
self.assertTupleEqual(e.enforce_ex("bob", "data1", "write"), (False, []))
def test_batch_enforce(self):
e = self.get_enforcer(
get_examples("basic_model.conf"),
get_examples("basic_policy.csv"),
)
results = [True, False, True, False]
self.assertEqual(
e.batch_enforce(
[
("alice", "data1", "read"),
("alice", "data2", "read"),
("bob", "data2", "write"),
("bob", "data1", "write"),
]
),
results,
)
def test_model_set_load(self):
e = self.get_enforcer(
get_examples("basic_model.conf"),
get_examples("basic_policy.csv"),
)
if not isinstance(e, casbin.SyncedEnforcer):
e.set_model(None)
self.assertTrue(e.model is None)
# creating new model
e.load_model()
self.assertTrue(e.model is not None)
def test_enforcer_basic_without_spaces(self):
e = self.get_enforcer(
get_examples("basic_model_without_spaces.conf"),
get_examples("basic_policy.csv"),
)
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertFalse(e.enforce("alice", "data1", "write"))
self.assertFalse(e.enforce("alice", "data2", "read"))
self.assertFalse(e.enforce("alice", "data2", "write"))
self.assertFalse(e.enforce("bob", "data1", "read"))
self.assertFalse(e.enforce("bob", "data1", "write"))
self.assertFalse(e.enforce("bob", "data2", "read"))
self.assertTrue(e.enforce("bob", "data2", "write"))
def test_enforce_basic_with_root(self):
e = self.get_enforcer(get_examples("basic_with_root_model.conf"), get_examples("basic_policy.csv"))
self.assertTrue(e.enforce("root", "any", "any"))
def test_enforce_basic_without_resources(self):
e = self.get_enforcer(
get_examples("basic_without_resources_model.conf"),
get_examples("basic_without_resources_policy.csv"),
)
self.assertTrue(e.enforce("alice", "read"))
self.assertFalse(e.enforce("alice", "write"))
self.assertTrue(e.enforce("bob", "write"))
self.assertFalse(e.enforce("bob", "read"))
def test_enforce_basic_without_users(self):
e = self.get_enforcer(
get_examples("basic_without_users_model.conf"),
get_examples("basic_without_users_policy.csv"),
)
self.assertTrue(e.enforce("data1", "read"))
self.assertFalse(e.enforce("data1", "write"))
self.assertTrue(e.enforce("data2", "write"))
self.assertFalse(e.enforce("data2", "read"))
def test_enforce_ip_match(self):
e = self.get_enforcer(get_examples("ipmatch_model.conf"), get_examples("ipmatch_policy.csv"))
self.assertTrue(e.enforce("192.168.2.1", "data1", "read"))
self.assertFalse(e.enforce("192.168.3.1", "data1", "read"))
def test_enforce_key_match(self):
e = self.get_enforcer(get_examples("keymatch_model.conf"), get_examples("keymatch_policy.csv"))
self.assertTrue(e.enforce("alice", "/alice_data/test", "GET"))
self.assertFalse(e.enforce("alice", "/bob_data/test", "GET"))
self.assertTrue(e.enforce("cathy", "/cathy_data", "GET"))
self.assertTrue(e.enforce("cathy", "/cathy_data", "POST"))
self.assertFalse(e.enforce("cathy", "/cathy_data/12", "POST"))
def test_enforce_key_match2(self):
e = self.get_enforcer(get_examples("keymatch2_model.conf"), get_examples("keymatch2_policy.csv"))
self.assertTrue(e.enforce("alice", "/alice_data/resource", "GET"))
self.assertTrue(e.enforce("alice", "/alice_data2/123/using/456", "GET"))
def test_enforce_key_match_custom_model(self):
e = self.get_enforcer(
get_examples("keymatch_custom_model.conf"),
get_examples("keymatch2_policy.csv"),
)
def custom_function(key1, key2):
if key1 == "/alice_data2/myid/using/res_id" and key2 == "/alice_data/:resource":
return True
elif key1 == "/alice_data2/myid/using/res_id" and key2 == "/alice_data2/:id/using/:resId":
return True
return False
e.add_function("keyMatchCustom", custom_function)
self.assertFalse(e.enforce("alice", "/alice_data2/myid", "GET"))
self.assertTrue(e.enforce("alice", "/alice_data2/myid/using/res_id", "GET"))
def test_enforce_glob_match(self):
e = self.get_enforcer(
get_examples("globmatch_model.conf"),
get_examples("globmatch_policy.csv"),
)
self.assertTrue(e.enforce("alice", "/alice_data/test_all", "GET"))
self.assertTrue(e.enforce("alice", "/alice_data/123", "POST"))
self.assertTrue(e.enforce("bob", "/alice_data/1", "GET"))
self.assertFalse(e.enforce("bob", "/alice_data/0", "GET"))
self.assertTrue(e.enforce("bob", "/bob_data/0", "POST"))
self.assertFalse(e.enforce("bob", "/bob_data/1", "POST"))
def test_enforce_priority(self):
e = self.get_enforcer(get_examples("priority_model.conf"), get_examples("priority_policy.csv"))
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertFalse(e.enforce("alice", "data1", "write"))
self.assertFalse(e.enforce("alice", "data2", "read"))
self.assertFalse(e.enforce("alice", "data2", "write"))
self.assertFalse(e.enforce("bob", "data1", "read"))
self.assertFalse(e.enforce("bob", "data1", "write"))
self.assertTrue(e.enforce("bob", "data2", "read"))
self.assertFalse(e.enforce("bob", "data2", "write"))
def test_enforce_priority_explicit(self):
e = self.get_enforcer(
get_examples("priority_model_explicit.conf"),
get_examples("priority_policy_explicit.csv"),
)
self.assertTrue(e.enforce("alice", "data1", "write"))
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertFalse(e.enforce("bob", "data2", "read"))
self.assertTrue(e.enforce("bob", "data2", "write"))
self.assertFalse(e.enforce("data1_deny_group", "data1", "read"))
self.assertFalse(e.enforce("data1_deny_group", "data1", "write"))
self.assertTrue(e.enforce("data2_allow_group", "data2", "read"))
self.assertTrue(e.enforce("data2_allow_group", "data2", "write"))
e.add_policy("1", "bob", "data2", "write", "deny")
self.assertTrue(e.enforce("alice", "data1", "write"))
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertFalse(e.enforce("bob", "data2", "read"))
self.assertFalse(e.enforce("bob", "data2", "write"))
self.assertFalse(e.enforce("data1_deny_group", "data1", "read"))
self.assertFalse(e.enforce("data1_deny_group", "data1", "write"))
self.assertTrue(e.enforce("data2_allow_group", "data2", "read"))
self.assertTrue(e.enforce("data2_allow_group", "data2", "write"))
def test_enforce_priority_indeterminate(self):
e = self.get_enforcer(
get_examples("priority_model.conf"),
get_examples("priority_indeterminate_policy.csv"),
)
self.assertFalse(e.enforce("alice", "data1", "read"))
def test_enforce_subpriority(self):
e = self.get_enforcer(
get_examples("subject_priority_model.conf"),
get_examples("subject_priority_policy.csv"),
)
self.assertTrue(e.enforce("jane", "data1", "read"))
self.assertTrue(e.enforce("alice", "data1", "read"))
def test_enforce_subpriority_with_domain(self):
e = self.get_enforcer(
get_examples("subject_priority_model_with_domain.conf"),
get_examples("subject_priority_policy_with_domain.csv"),
)
self.assertTrue(e.enforce("alice", "data1", "domain1", "write"))
self.assertTrue(e.enforce("bob", "data2", "domain2", "write"))
def test_multiple_policy_definitions(self):
e = self.get_enforcer(
get_examples("multiple_policy_definitions_model.conf"),
get_examples("multiple_policy_definitions_policy.csv"),
)
enforce_context = e.new_enforce_context("2")
enforce_context.etype = "e"
sub1 = MockSub("alice", 70)
sub2 = MockSub("bob", 30)
self.assertTrue(e.enforce("alice", "data2", "read"))
self.assertFalse(e.enforce(enforce_context, sub1, "/data1", "read"))
self.assertTrue(e.enforce(enforce_context, sub2, "/data1", "read"))
self.assertFalse(e.enforce(enforce_context, sub2, "/data1", "write"))
self.assertFalse(e.enforce(enforce_context, sub1, "/data2", "read"))
def test_enforce_rbac(self):
e = self.get_enforcer(get_examples("rbac_model.conf"), get_examples("rbac_policy.csv"))
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertFalse(e.enforce("bob", "data1", "read"))
self.assertTrue(e.enforce("bob", "data2", "write"))
self.assertTrue(e.enforce("alice", "data2", "read"))
self.assertTrue(e.enforce("alice", "data2", "write"))
self.assertFalse(e.enforce("bogus", "data2", "write")) # test non-existant subject
def test_enforce_rbac_empty_policy(self):
e = self.get_enforcer(get_examples("rbac_model.conf"), get_examples("empty_policy.csv"))
self.assertFalse(e.enforce("alice", "data1", "read"))
self.assertFalse(e.enforce("bob", "data1", "read"))
self.assertFalse(e.enforce("bob", "data2", "write"))
self.assertFalse(e.enforce("alice", "data2", "read"))
self.assertFalse(e.enforce("alice", "data2", "write"))
def test_enforce_rbac_with_deny(self):
e = self.get_enforcer(
get_examples("rbac_with_deny_model.conf"),
get_examples("rbac_with_deny_policy.csv"),
)
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertTrue(e.enforce("bob", "data2", "write"))
self.assertTrue(e.enforce("alice", "data2", "read"))
self.assertFalse(e.enforce("alice", "data2", "write"))
def test_enforce_rbac_with_domains(self):
e = self.get_enforcer(
get_examples("rbac_with_domains_model.conf"),
get_examples("rbac_with_domains_policy.csv"),
)
self.assertTrue(e.enforce("alice", "domain1", "data1", "read"))
self.assertTrue(e.enforce("alice", "domain1", "data1", "write"))
self.assertFalse(e.enforce("alice", "domain1", "data2", "read"))
self.assertFalse(e.enforce("alice", "domain1", "data2", "write"))
self.assertFalse(e.enforce("bob", "domain2", "data1", "read"))
self.assertFalse(e.enforce("bob", "domain2", "data1", "write"))
self.assertTrue(e.enforce("bob", "domain2", "data2", "read"))
self.assertTrue(e.enforce("bob", "domain2", "data2", "write"))
def test_enforce_rbac_with_not_deny(self):
e = self.get_enforcer(
get_examples("rbac_with_not_deny_model.conf"),
get_examples("rbac_with_deny_policy.csv"),
)
self.assertFalse(e.enforce("alice", "data2", "write"))
def test_enforce_rbac_with_resource_roles(self):
e = self.get_enforcer(
get_examples("rbac_with_resource_roles_model.conf"),
get_examples("rbac_with_resource_roles_policy.csv"),
)
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertTrue(e.enforce("alice", "data1", "write"))
self.assertFalse(e.enforce("alice", "data2", "read"))
self.assertTrue(e.enforce("alice", "data2", "write"))
self.assertFalse(e.enforce("bob", "data1", "read"))
self.assertFalse(e.enforce("bob", "data1", "write"))
self.assertFalse(e.enforce("bob", "data2", "read"))
self.assertTrue(e.enforce("bob", "data2", "write"))
def test_enforce_rbac_with_pattern(self):
e = self.get_enforcer(
get_examples("rbac_with_pattern_model.conf"),
get_examples("rbac_with_pattern_policy.csv"),
)
# set matching function to key_match2
e.add_named_matching_func("g2", casbin.util.key_match2)
self.assertTrue(e.enforce("alice", "/book/1", "GET"))
self.assertTrue(e.enforce("alice", "/book/2", "GET"))
self.assertTrue(e.enforce("alice", "/pen/1", "GET"))
self.assertFalse(e.enforce("alice", "/pen/2", "GET"))
self.assertFalse(e.enforce("bob", "/book/1", "GET"))
self.assertFalse(e.enforce("bob", "/book/2", "GET"))
self.assertTrue(e.enforce("bob", "/pen/1", "GET"))
self.assertTrue(e.enforce("bob", "/pen/2", "GET"))
# replace key_match2 with key_match3
e.add_named_matching_func("g2", casbin.util.key_match3)
self.assertTrue(e.enforce("alice", "/book2/1", "GET"))
self.assertTrue(e.enforce("alice", "/book2/2", "GET"))
self.assertTrue(e.enforce("alice", "/pen2/1", "GET"))
self.assertFalse(e.enforce("alice", "/pen2/2", "GET"))
self.assertFalse(e.enforce("bob", "/book2/1", "GET"))
self.assertFalse(e.enforce("bob", "/book2/2", "GET"))
self.assertTrue(e.enforce("bob", "/pen2/1", "GET"))
self.assertTrue(e.enforce("bob", "/pen2/2", "GET"))
def test_rbac_with_multipy_matched_pattern(self):
e = self.get_enforcer(
get_examples("rbac_with_multiply_matched_pattern.conf"),
get_examples("rbac_with_multiply_matched_pattern.csv"),
)
e.add_named_matching_func("g2", casbin.util.glob_match)
self.assertTrue(e.enforce("root@localhost", "/", "org.create"))
def test_enforce_abac_log_enabled(self):
e = self.get_enforcer(get_examples("abac_model.conf"))
sub = "alice"
obj = {"Owner": "alice", "id": "data1"}
self.assertTrue(e.enforce(sub, obj, "write"))
def test_abac_with_sub_rule(self):
e = self.get_enforcer(get_examples("abac_rule_model.conf"), get_examples("abac_rule_policy.csv"))
sub1 = MockSub("alice", 16)
sub2 = MockSub("bob", 20)
sub3 = MockSub("alice", 65)
self.assertFalse(e.enforce(sub1, "/data1", "read"))
self.assertFalse(e.enforce(sub1, "/data2", "read"))
self.assertFalse(e.enforce(sub1, "/data1", "write"))
self.assertTrue(e.enforce(sub1, "/data2", "write"))
self.assertTrue(e.enforce(sub2, "/data1", "read"))
self.assertFalse(e.enforce(sub2, "/data2", "read"))
self.assertFalse(e.enforce(sub2, "/data1", "write"))
self.assertTrue(e.enforce(sub2, "/data2", "write"))
self.assertTrue(e.enforce(sub3, "/data1", "read"))
self.assertFalse(e.enforce(sub3, "/data2", "read"))
self.assertFalse(e.enforce(sub3, "/data1", "write"))
self.assertFalse(e.enforce(sub3, "/data2", "write"))
def test_abac_with_multiple_sub_rules(self):
e = self.get_enforcer(
get_examples("abac_multiple_rules_model.conf"),
get_examples("abac_multiple_rules_policy.csv"),
)
sub1 = MockSub("alice", 16)
sub2 = MockSub("alice", 20)
sub3 = MockSub("bob", 65)
sub4 = MockSub("bob", 35)
self.assertFalse(e.enforce(sub1, "/data1", "read"))
self.assertFalse(e.enforce(sub1, "/data2", "read"))
self.assertFalse(e.enforce(sub1, "/data1", "write"))
self.assertFalse(e.enforce(sub1, "/data2", "write"))
self.assertTrue(e.enforce(sub2, "/data1", "read"))
self.assertFalse(e.enforce(sub2, "/data2", "read"))
self.assertFalse(e.enforce(sub2, "/data1", "write"))
self.assertFalse(e.enforce(sub2, "/data2", "write"))
self.assertFalse(e.enforce(sub3, "/data1", "read"))
self.assertFalse(e.enforce(sub3, "/data2", "read"))
self.assertFalse(e.enforce(sub3, "/data1", "write"))
self.assertFalse(e.enforce(sub3, "/data2", "write"))
self.assertFalse(e.enforce(sub4, "/data1", "read"))
self.assertFalse(e.enforce(sub4, "/data2", "read"))
self.assertFalse(e.enforce(sub4, "/data1", "write"))
self.assertTrue(e.enforce(sub4, "/data2", "write"))
def test_matcher_using_in_operator_bracket(self):
e = self.get_enforcer(
get_examples("rbac_model_matcher_using_in_op_bracket.conf"),
get_examples("rbac_policy.csv"),
)
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertTrue(e.enforce("alice", "data2", "read"))
self.assertTrue(e.enforce("alice", "data3", "scribble"))
self.assertFalse(e.enforce("alice", "data4", "scribble"))
def test_temporal_roles_model(self):
e = self.get_enforcer(
get_examples("rbac_with_temporal_roles_model.conf"),
get_examples("rbac_with_temporal_roles_policy.csv"),
)
e.add_named_link_condition_func("g", "alice", "data2_admin", util.time_match_func)
e.add_named_link_condition_func("g", "alice", "data3_admin", util.time_match_func)
e.add_named_link_condition_func("g", "alice", "data4_admin", util.time_match_func)
e.add_named_link_condition_func("g", "alice", "data5_admin", util.time_match_func)
e.add_named_link_condition_func("g", "alice", "data6_admin", util.time_match_func)
e.add_named_link_condition_func("g", "alice", "data7_admin", util.time_match_func)
e.add_named_link_condition_func("g", "alice", "data8_admin", util.time_match_func)
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertTrue(e.enforce("alice", "data1", "write"))
self.assertFalse(e.enforce("alice", "data2", "read"))
self.assertFalse(e.enforce("alice", "data2", "write"))
self.assertTrue(e.enforce("alice", "data3", "read"))
self.assertTrue(e.enforce("alice", "data3", "write"))
self.assertTrue(e.enforce("alice", "data4", "read"))
self.assertTrue(e.enforce("alice", "data4", "write"))
self.assertTrue(e.enforce("alice", "data5", "read"))
self.assertTrue(e.enforce("alice", "data5", "write"))
self.assertFalse(e.enforce("alice", "data6", "read"))
self.assertFalse(e.enforce("alice", "data6", "write"))
self.assertTrue(e.enforce("alice", "data7", "read"))
self.assertTrue(e.enforce("alice", "data7", "write"))
self.assertFalse(e.enforce("alice", "data8", "read"))
self.assertFalse(e.enforce("alice", "data8", "write"))
def test_temporal_roles_model_with_domain(self):
e = self.get_enforcer(
get_examples("rbac_with_domain_temporal_roles_model.conf"),
get_examples("rbac_with_domain_temporal_roles_policy.csv"),
)
e.add_named_domain_link_condition_func("g", "alice", "data2_admin", "domain2", util.time_match_func)
e.add_named_domain_link_condition_func("g", "alice", "data3_admin", "domain3", util.time_match_func)
e.add_named_domain_link_condition_func("g", "alice", "data4_admin", "domain4", util.time_match_func)
e.add_named_domain_link_condition_func("g", "alice", "data5_admin", "domain5", util.time_match_func)
e.add_named_domain_link_condition_func("g", "alice", "data6_admin", "domain6", util.time_match_func)
e.add_named_domain_link_condition_func("g", "alice", "data7_admin", "domain7", util.time_match_func)
e.add_named_domain_link_condition_func("g", "alice", "data8_admin", "domain8", util.time_match_func)
self.assertTrue(e.enforce("alice", "domain1", "data1", "read"))
self.assertTrue(e.enforce("alice", "domain1", "data1", "write"))
self.assertFalse(e.enforce("alice", "domain2", "data2", "read"))
self.assertFalse(e.enforce("alice", "domain2", "data2", "write"))
self.assertTrue(e.enforce("alice", "domain3", "data3", "read"))
self.assertTrue(e.enforce("alice", "domain3", "data3", "write"))
self.assertTrue(e.enforce("alice", "domain4", "data4", "read"))
self.assertTrue(e.enforce("alice", "domain4", "data4", "write"))
self.assertTrue(e.enforce("alice", "domain5", "data5", "read"))
self.assertTrue(e.enforce("alice", "domain5", "data5", "write"))
self.assertFalse(e.enforce("alice", "domain6", "data6", "read"))
self.assertFalse(e.enforce("alice", "domain6", "data6", "write"))
self.assertTrue(e.enforce("alice", "domain7", "data7", "read"))
self.assertTrue(e.enforce("alice", "domain7", "data7", "write"))
self.assertFalse(e.enforce("alice", "domain8", "data8", "read"))
self.assertFalse(e.enforce("alice", "domain8", "data8", "write"))
self.assertFalse(e.enforce("alice", "domain_not_exist", "data1", "read"))
self.assertFalse(e.enforce("alice", "domain_not_exist", "data1", "write"))
self.assertFalse(e.enforce("alice", "domain_not_exist", "data2", "read"))
self.assertFalse(e.enforce("alice", "domain_not_exist", "data2", "write"))
self.assertFalse(e.enforce("alice", "domain_not_exist", "data3", "read"))
self.assertFalse(e.enforce("alice", "domain_not_exist", "data3", "write"))
self.assertFalse(e.enforce("alice", "domain_not_exist", "data4", "read"))
self.assertFalse(e.enforce("alice", "domain_not_exist", "data4", "write"))
self.assertFalse(e.enforce("alice", "domain_not_exist", "data5", "read"))
self.assertFalse(e.enforce("alice", "domain_not_exist", "data5", "write"))
self.assertFalse(e.enforce("alice", "domain_not_exist", "data6", "read"))
self.assertFalse(e.enforce("alice", "domain_not_exist", "data6", "write"))
self.assertFalse(e.enforce("alice", "domain_not_exist", "data7", "read"))
self.assertFalse(e.enforce("alice", "domain_not_exist", "data7", "write"))
self.assertFalse(e.enforce("alice", "domain_not_exist", "data8", "read"))
self.assertFalse(e.enforce("alice", "domain_not_exist", "data8", "write"))
def test_link_condition_function(self):
e = self.get_enforcer(
get_examples("rbac_with_temporal_roles_model.conf"),
get_examples("rbac_with_temporal_roles_condition_policy.csv"),
)
true_func = lambda *args: True if args[0] == "_" or args[0] == "true" else False
false_func = lambda *args: True if args[0] == "_" or args[0] == "false" else False
e.add_named_link_condition_func("g", "alice", "data2_admin", true_func)
e.add_named_link_condition_func("g", "alice", "data3_admin", true_func)
e.add_named_link_condition_func("g", "alice", "data4_admin", false_func)
e.add_named_link_condition_func("g", "alice", "data5_admin", false_func)
e.set_named_link_condition_func_params("g", "alice", "data2_admin", "true")
e.set_named_link_condition_func_params("g", "alice", "data3_admin", "not true")
e.set_named_link_condition_func_params("g", "alice", "data4_admin", "false")
e.set_named_link_condition_func_params("g", "alice", "data5_admin", "not false")
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertTrue(e.enforce("alice", "data1", "write"))
self.assertTrue(e.enforce("alice", "data2", "read"))
self.assertTrue(e.enforce("alice", "data2", "write"))
self.assertFalse(e.enforce("alice", "data3", "read"))
self.assertFalse(e.enforce("alice", "data3", "write"))
self.assertTrue(e.enforce("alice", "data4", "read"))
self.assertTrue(e.enforce("alice", "data4", "write"))
self.assertFalse(e.enforce("alice", "data5", "read"))
self.assertFalse(e.enforce("alice", "data5", "write"))
e = self.get_enforcer(
get_examples("rbac_with_domain_temporal_roles_model.conf"),
get_examples("rbac_with_domain_temporal_roles_condition_policy.csv"),
)
e.add_named_domain_link_condition_func("g", "alice", "data2_admin", "domain2", true_func)
e.add_named_domain_link_condition_func("g", "alice", "data3_admin", "domain3", true_func)
e.add_named_domain_link_condition_func("g", "alice", "data4_admin", "domain4", false_func)
e.add_named_domain_link_condition_func("g", "alice", "data5_admin", "domain5", false_func)
e.set_named_domain_link_condition_func_params("g", "alice", "data2_admin", "domain2", "true")
e.set_named_domain_link_condition_func_params("g", "alice", "data3_admin", "domain3", "not true")
e.set_named_domain_link_condition_func_params("g", "alice", "data4_admin", "domain4", "false")
e.set_named_domain_link_condition_func_params("g", "alice", "data5_admin", "domain5", "not false")
self.assertTrue(e.enforce("alice", "domain1", "data1", "read"))
self.assertTrue(e.enforce("alice", "domain1", "data1", "write"))
self.assertTrue(e.enforce("alice", "domain2", "data2", "read"))
self.assertTrue(e.enforce("alice", "domain2", "data2", "write"))
self.assertFalse(e.enforce("alice", "domain3", "data3", "read"))
self.assertFalse(e.enforce("alice", "domain3", "data3", "write"))
self.assertTrue(e.enforce("alice", "domain4", "data4", "read"))
self.assertTrue(e.enforce("alice", "domain4", "data4", "write"))
self.assertFalse(e.enforce("alice", "domain5", "data5", "read"))
self.assertFalse(e.enforce("alice", "domain5", "data5", "write"))
class TestConfigSynced(TestConfig):
def get_enforcer(self, model=None, adapter=None):
return casbin.SyncedEnforcer(
model,
adapter,
)
def test_auto_loading_policy(self):
e = self.get_enforcer(
get_examples("basic_model.conf"),
get_examples("basic_policy.csv"),
)
e.start_auto_load_policy(5 / 1000)
self.assertTrue(e.is_auto_loading_running())
e.stop_auto_load_policy()
# thread needs a moment to exit
time.sleep(10 / 1000)
self.assertFalse(e.is_auto_loading_running())
class TestConfigAsync(IsolatedAsyncioTestCase):
def get_enforcer(self, model=None, adapter=None):
return casbin.AsyncEnforcer(
model,
adapter,
)
async def test_enforcer_basic(self):
e = self.get_enforcer(
get_examples("basic_model.conf"),
get_examples("basic_policy.csv"),
)
await e.load_policy()
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertFalse(e.enforce("alice", "data2", "read"))
self.assertTrue(e.enforce("bob", "data2", "write"))
self.assertFalse(e.enforce("bob", "data1", "write"))
async def test_enforce_ex_basic(self):
e = self.get_enforcer(
get_examples("basic_model.conf"),
get_examples("basic_policy.csv"),
)
await e.load_policy()
self.assertTupleEqual(e.enforce_ex("alice", "data1", "read"), (True, ["alice", "data1", "read"]))
self.assertTupleEqual(e.enforce_ex("alice", "data2", "read"), (False, []))
self.assertTupleEqual(e.enforce_ex("bob", "data2", "write"), (True, ["bob", "data2", "write"]))
self.assertTupleEqual(e.enforce_ex("bob", "data1", "write"), (False, []))
async def test_batch_enforce(self):
e = self.get_enforcer(
get_examples("basic_model.conf"),
get_examples("basic_policy.csv"),
)
await e.load_policy()
results = [True, False, True, False]
self.assertEqual(
e.batch_enforce(
[
("alice", "data1", "read"),
("alice", "data2", "read"),
("bob", "data2", "write"),
("bob", "data1", "write"),
]
),
results,
)
async def test_model_set_load(self):
e = self.get_enforcer(
get_examples("basic_model.conf"),
get_examples("basic_policy.csv"),
)
await e.load_policy()
if not isinstance(e, casbin.SyncedEnforcer):
e.set_model(None)
self.assertTrue(e.model is None)
# creating new model
e.load_model()
self.assertTrue(e.model is not None)
async def test_enforcer_basic_without_spaces(self):
e = self.get_enforcer(
get_examples("basic_model_without_spaces.conf"),
get_examples("basic_policy.csv"),
)
await e.load_policy()
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertFalse(e.enforce("alice", "data1", "write"))
self.assertFalse(e.enforce("alice", "data2", "read"))
self.assertFalse(e.enforce("alice", "data2", "write"))
self.assertFalse(e.enforce("bob", "data1", "read"))
self.assertFalse(e.enforce("bob", "data1", "write"))
self.assertFalse(e.enforce("bob", "data2", "read"))
self.assertTrue(e.enforce("bob", "data2", "write"))
async def test_enforce_basic_with_root(self):
e = self.get_enforcer(get_examples("basic_with_root_model.conf"), get_examples("basic_policy.csv"))
await e.load_policy()
self.assertTrue(e.enforce("root", "any", "any"))
async def test_enforce_basic_without_resources(self):
e = self.get_enforcer(
get_examples("basic_without_resources_model.conf"),
get_examples("basic_without_resources_policy.csv"),
)
await e.load_policy()
self.assertTrue(e.enforce("alice", "read"))
self.assertFalse(e.enforce("alice", "write"))
self.assertTrue(e.enforce("bob", "write"))
self.assertFalse(e.enforce("bob", "read"))
async def test_enforce_basic_without_users(self):
e = self.get_enforcer(
get_examples("basic_without_users_model.conf"),
get_examples("basic_without_users_policy.csv"),
)
await e.load_policy()
self.assertTrue(e.enforce("data1", "read"))
self.assertFalse(e.enforce("data1", "write"))
self.assertTrue(e.enforce("data2", "write"))
self.assertFalse(e.enforce("data2", "read"))
async def test_enforce_ip_match(self):
e = self.get_enforcer(get_examples("ipmatch_model.conf"), get_examples("ipmatch_policy.csv"))
await e.load_policy()
self.assertTrue(e.enforce("192.168.2.1", "data1", "read"))
self.assertFalse(e.enforce("192.168.3.1", "data1", "read"))
async def test_enforce_key_match(self):
e = self.get_enforcer(get_examples("keymatch_model.conf"), get_examples("keymatch_policy.csv"))
await e.load_policy()
self.assertTrue(e.enforce("alice", "/alice_data/test", "GET"))
self.assertFalse(e.enforce("alice", "/bob_data/test", "GET"))
self.assertTrue(e.enforce("cathy", "/cathy_data", "GET"))
self.assertTrue(e.enforce("cathy", "/cathy_data", "POST"))
self.assertFalse(e.enforce("cathy", "/cathy_data/12", "POST"))
async def test_enforce_key_match2(self):
e = self.get_enforcer(get_examples("keymatch2_model.conf"), get_examples("keymatch2_policy.csv"))
await e.load_policy()
self.assertTrue(e.enforce("alice", "/alice_data/resource", "GET"))
self.assertTrue(e.enforce("alice", "/alice_data2/123/using/456", "GET"))
async def test_enforce_key_match_custom_model(self):
e = self.get_enforcer(
get_examples("keymatch_custom_model.conf"),
get_examples("keymatch2_policy.csv"),
)
await e.load_policy()
def custom_function(key1, key2):
if key1 == "/alice_data2/myid/using/res_id" and key2 == "/alice_data/:resource":
return True
elif key1 == "/alice_data2/myid/using/res_id" and key2 == "/alice_data2/:id/using/:resId":
return True
return False
e.add_function("keyMatchCustom", custom_function)
self.assertFalse(e.enforce("alice", "/alice_data2/myid", "GET"))
self.assertTrue(e.enforce("alice", "/alice_data2/myid/using/res_id", "GET"))
async def test_enforce_glob_match(self):
e = self.get_enforcer(
get_examples("globmatch_model.conf"),
get_examples("globmatch_policy.csv"),
)
await e.load_policy()
self.assertTrue(e.enforce("alice", "/alice_data/test_all", "GET"))
self.assertTrue(e.enforce("alice", "/alice_data/123", "POST"))
self.assertTrue(e.enforce("bob", "/alice_data/1", "GET"))
self.assertFalse(e.enforce("bob", "/alice_data/0", "GET"))
self.assertTrue(e.enforce("bob", "/bob_data/0", "POST"))
self.assertFalse(e.enforce("bob", "/bob_data/1", "POST"))
async def test_enforce_priority(self):
e = self.get_enforcer(get_examples("priority_model.conf"), get_examples("priority_policy.csv"))
await e.load_policy()
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertFalse(e.enforce("alice", "data1", "write"))
self.assertFalse(e.enforce("alice", "data2", "read"))
self.assertFalse(e.enforce("alice", "data2", "write"))
self.assertFalse(e.enforce("bob", "data1", "read"))
self.assertFalse(e.enforce("bob", "data1", "write"))
self.assertTrue(e.enforce("bob", "data2", "read"))
self.assertFalse(e.enforce("bob", "data2", "write"))
async def test_enforce_priority_explicit(self):
e = self.get_enforcer(
get_examples("priority_model_explicit.conf"),
get_examples("priority_policy_explicit.csv"),
)
await e.load_policy()
self.assertTrue(e.enforce("alice", "data1", "write"))
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertFalse(e.enforce("bob", "data2", "read"))
self.assertTrue(e.enforce("bob", "data2", "write"))
self.assertFalse(e.enforce("data1_deny_group", "data1", "read"))
self.assertFalse(e.enforce("data1_deny_group", "data1", "write"))
self.assertTrue(e.enforce("data2_allow_group", "data2", "read"))
self.assertTrue(e.enforce("data2_allow_group", "data2", "write"))
await e.add_policy("1", "bob", "data2", "write", "deny")
self.assertTrue(e.enforce("alice", "data1", "write"))
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertFalse(e.enforce("bob", "data2", "read"))
self.assertFalse(e.enforce("bob", "data2", "write"))
self.assertFalse(e.enforce("data1_deny_group", "data1", "read"))
self.assertFalse(e.enforce("data1_deny_group", "data1", "write"))
self.assertTrue(e.enforce("data2_allow_group", "data2", "read"))
self.assertTrue(e.enforce("data2_allow_group", "data2", "write"))
async def test_enforce_priority_indeterminate(self):
e = self.get_enforcer(
get_examples("priority_model.conf"),
get_examples("priority_indeterminate_policy.csv"),
)
await e.load_policy()
self.assertFalse(e.enforce("alice", "data1", "read"))
async def test_enforce_subpriority(self):
e = self.get_enforcer(
get_examples("subject_priority_model.conf"),
get_examples("subject_priority_policy.csv"),
)
await e.load_policy()
self.assertTrue(e.enforce("jane", "data1", "read"))
self.assertTrue(e.enforce("alice", "data1", "read"))
async def test_enforce_subpriority_with_domain(self):
e = self.get_enforcer(
get_examples("subject_priority_model_with_domain.conf"),
get_examples("subject_priority_policy_with_domain.csv"),
)
await e.load_policy()
self.assertTrue(e.enforce("alice", "data1", "domain1", "write"))
self.assertTrue(e.enforce("bob", "data2", "domain2", "write"))
async def test_multiple_policy_definitions(self):
e = self.get_enforcer(
get_examples("multiple_policy_definitions_model.conf"),
get_examples("multiple_policy_definitions_policy.csv"),
)
await e.load_policy()
enforce_context = e.new_enforce_context("2")
enforce_context.etype = "e"
sub1 = MockSub("alice", 70)
sub2 = MockSub("bob", 30)
self.assertTrue(e.enforce("alice", "data2", "read"))
self.assertFalse(e.enforce(enforce_context, sub1, "/data1", "read"))
self.assertTrue(e.enforce(enforce_context, sub2, "/data1", "read"))
self.assertFalse(e.enforce(enforce_context, sub2, "/data1", "write"))
self.assertFalse(e.enforce(enforce_context, sub1, "/data2", "read"))
async def test_enforce_rbac(self):
e = self.get_enforcer(get_examples("rbac_model.conf"), get_examples("rbac_policy.csv"))
await e.load_policy()
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertFalse(e.enforce("bob", "data1", "read"))
self.assertTrue(e.enforce("bob", "data2", "write"))
self.assertTrue(e.enforce("alice", "data2", "read"))
self.assertTrue(e.enforce("alice", "data2", "write"))
self.assertFalse(e.enforce("bogus", "data2", "write")) # test non-existant subject
async def test_enforce_rbac_empty_policy(self):
e = self.get_enforcer(get_examples("rbac_model.conf"), get_examples("empty_policy.csv"))
await e.load_policy()
self.assertFalse(e.enforce("alice", "data1", "read"))
self.assertFalse(e.enforce("bob", "data1", "read"))
self.assertFalse(e.enforce("bob", "data2", "write"))
self.assertFalse(e.enforce("alice", "data2", "read"))
self.assertFalse(e.enforce("alice", "data2", "write"))
async def test_enforce_rbac_with_deny(self):
e = self.get_enforcer(
get_examples("rbac_with_deny_model.conf"),
get_examples("rbac_with_deny_policy.csv"),
)
await e.load_policy()
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertTrue(e.enforce("bob", "data2", "write"))
self.assertTrue(e.enforce("alice", "data2", "read"))
self.assertFalse(e.enforce("alice", "data2", "write"))
async def test_enforce_rbac_with_domains(self):
e = self.get_enforcer(
get_examples("rbac_with_domains_model.conf"),
get_examples("rbac_with_domains_policy.csv"),
)
await e.load_policy()
self.assertTrue(e.enforce("alice", "domain1", "data1", "read"))
self.assertTrue(e.enforce("alice", "domain1", "data1", "write"))
self.assertFalse(e.enforce("alice", "domain1", "data2", "read"))
self.assertFalse(e.enforce("alice", "domain1", "data2", "write"))
self.assertFalse(e.enforce("bob", "domain2", "data1", "read"))
self.assertFalse(e.enforce("bob", "domain2", "data1", "write"))
self.assertTrue(e.enforce("bob", "domain2", "data2", "read"))
self.assertTrue(e.enforce("bob", "domain2", "data2", "write"))
async def test_enforce_rbac_with_not_deny(self):
e = self.get_enforcer(
get_examples("rbac_with_not_deny_model.conf"),
get_examples("rbac_with_deny_policy.csv"),
)
await e.load_policy()
self.assertFalse(e.enforce("alice", "data2", "write"))
async def test_enforce_rbac_with_resource_roles(self):
e = self.get_enforcer(
get_examples("rbac_with_resource_roles_model.conf"),
get_examples("rbac_with_resource_roles_policy.csv"),
)
await e.load_policy()
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertTrue(e.enforce("alice", "data1", "write"))
self.assertFalse(e.enforce("alice", "data2", "read"))
self.assertTrue(e.enforce("alice", "data2", "write"))
self.assertFalse(e.enforce("bob", "data1", "read"))
self.assertFalse(e.enforce("bob", "data1", "write"))
self.assertFalse(e.enforce("bob", "data2", "read"))
self.assertTrue(e.enforce("bob", "data2", "write"))
async def test_enforce_rbac_with_pattern(self):
e = self.get_enforcer(
get_examples("rbac_with_pattern_model.conf"),
get_examples("rbac_with_pattern_policy.csv"),
)
await e.load_policy()
# set matching function to key_match2
e.add_named_matching_func("g2", casbin.util.key_match2)
self.assertTrue(e.enforce("alice", "/book/1", "GET"))
self.assertTrue(e.enforce("alice", "/book/2", "GET"))
self.assertTrue(e.enforce("alice", "/pen/1", "GET"))
self.assertFalse(e.enforce("alice", "/pen/2", "GET"))
self.assertFalse(e.enforce("bob", "/book/1", "GET"))
self.assertFalse(e.enforce("bob", "/book/2", "GET"))
self.assertTrue(e.enforce("bob", "/pen/1", "GET"))
self.assertTrue(e.enforce("bob", "/pen/2", "GET"))
# replace key_match2 with key_match3
e.add_named_matching_func("g2", casbin.util.key_match3)
self.assertTrue(e.enforce("alice", "/book2/1", "GET"))
self.assertTrue(e.enforce("alice", "/book2/2", "GET"))
self.assertTrue(e.enforce("alice", "/pen2/1", "GET"))
self.assertFalse(e.enforce("alice", "/pen2/2", "GET"))
self.assertFalse(e.enforce("bob", "/book2/1", "GET"))
self.assertFalse(e.enforce("bob", "/book2/2", "GET"))
self.assertTrue(e.enforce("bob", "/pen2/1", "GET"))
self.assertTrue(e.enforce("bob", "/pen2/2", "GET"))
async def test_rbac_with_multipy_matched_pattern(self):
e = self.get_enforcer(
get_examples("rbac_with_multiply_matched_pattern.conf"),
get_examples("rbac_with_multiply_matched_pattern.csv"),
)
await e.load_policy()
e.add_named_matching_func("g2", casbin.util.glob_match)
self.assertTrue(e.enforce("root@localhost", "/", "org.create"))
async def test_enforce_abac_log_enabled(self):
e = self.get_enforcer(get_examples("abac_model.conf"))
sub = "alice"
obj = {"Owner": "alice", "id": "data1"}
self.assertTrue(e.enforce(sub, obj, "write"))
async def test_abac_with_sub_rule(self):
e = self.get_enforcer(get_examples("abac_rule_model.conf"), get_examples("abac_rule_policy.csv"))
await e.load_policy()
sub1 = MockSub("alice", 16)
sub2 = MockSub("bob", 20)
sub3 = MockSub("alice", 65)
self.assertFalse(e.enforce(sub1, "/data1", "read"))
self.assertFalse(e.enforce(sub1, "/data2", "read"))
self.assertFalse(e.enforce(sub1, "/data1", "write"))
self.assertTrue(e.enforce(sub1, "/data2", "write"))
self.assertTrue(e.enforce(sub2, "/data1", "read"))
self.assertFalse(e.enforce(sub2, "/data2", "read"))
self.assertFalse(e.enforce(sub2, "/data1", "write"))
self.assertTrue(e.enforce(sub2, "/data2", "write"))
self.assertTrue(e.enforce(sub3, "/data1", "read"))
self.assertFalse(e.enforce(sub3, "/data2", "read"))
self.assertFalse(e.enforce(sub3, "/data1", "write"))
self.assertFalse(e.enforce(sub3, "/data2", "write"))
async def test_abac_with_multiple_sub_rules(self):
e = self.get_enforcer(
get_examples("abac_multiple_rules_model.conf"),
get_examples("abac_multiple_rules_policy.csv"),
)
await e.load_policy()
sub1 = MockSub("alice", 16)
sub2 = MockSub("alice", 20)
sub3 = MockSub("bob", 65)
sub4 = MockSub("bob", 35)
self.assertFalse(e.enforce(sub1, "/data1", "read"))
self.assertFalse(e.enforce(sub1, "/data2", "read"))
self.assertFalse(e.enforce(sub1, "/data1", "write"))
self.assertFalse(e.enforce(sub1, "/data2", "write"))
self.assertTrue(e.enforce(sub2, "/data1", "read"))
self.assertFalse(e.enforce(sub2, "/data2", "read"))
self.assertFalse(e.enforce(sub2, "/data1", "write"))
self.assertFalse(e.enforce(sub2, "/data2", "write"))
self.assertFalse(e.enforce(sub3, "/data1", "read"))
self.assertFalse(e.enforce(sub3, "/data2", "read"))
self.assertFalse(e.enforce(sub3, "/data1", "write"))
self.assertFalse(e.enforce(sub3, "/data2", "write"))
self.assertFalse(e.enforce(sub4, "/data1", "read"))
self.assertFalse(e.enforce(sub4, "/data2", "read"))
self.assertFalse(e.enforce(sub4, "/data1", "write"))
self.assertTrue(e.enforce(sub4, "/data2", "write"))
async def test_matcher_using_in_operator_bracket(self):
e = self.get_enforcer(
get_examples("rbac_model_matcher_using_in_op_bracket.conf"),
get_examples("rbac_policy.csv"),
)
await e.load_policy()
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertTrue(e.enforce("alice", "data2", "read"))
self.assertTrue(e.enforce("alice", "data3", "scribble"))
self.assertFalse(e.enforce("alice", "data4", "scribble"))