blob: 858a7de480f009a7bb77e5353919ce52b83ecc4a [file] [log] [blame]
from casbin_redis_adapter.adapter import Adapter
from casbin_redis_adapter.adapter import CasbinRule
from unittest import TestCase
import redis
import casbin
import os
def get_fixture(path):
"""
get model path
"""
dir_path = os.path.split(os.path.realpath(__file__))[0] + "/"
return os.path.abspath(dir_path + path)
def get_enforcer():
adapter = Adapter("localhost", 6379, encoding="utf-8")
e = casbin.Enforcer(get_fixture("rbac_model.conf"), adapter)
model = e.get_model()
model.clear_policy()
model.add_policy("p", "p", ["alice", "data1", "read"])
adapter.save_policy(model)
model.clear_policy()
model.add_policy("p", "p", ["bob", "data2", "write"])
adapter.save_policy(model)
model.clear_policy()
model.add_policy("p", "p", ["data2_admin", "data2", "read"])
adapter.save_policy(model)
model.clear_policy()
model.add_policy("p", "p", ["data2_admin", "data2", "write"])
adapter.save_policy(model)
model.clear_policy()
model.add_policy("g", "g", ["alice", "data2_admin"])
adapter.save_policy(model)
return casbin.Enforcer(get_fixture("rbac_model.conf"), adapter)
def clear_db(dbname):
client = redis.Redis()
client.delete(dbname)
class TestConfig(TestCase):
"""
unittest
"""
def setUp(self):
clear_db("casbin_rules")
def tearDown(self):
clear_db("casbin_rules")
def test_enforcer_basic(self):
"""
test policy
"""
e = get_enforcer()
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertFalse(e.enforce("alice", "data1", "write"))
self.assertFalse(e.enforce("bob", "data2", "read"))
self.assertTrue(e.enforce("bob", "data2", "write"))
self.assertTrue(e.enforce("alice", "data2", "read"))
self.assertTrue(e.enforce("alice", "data2", "write"))
def test_add_policy(self):
"""
test add_policy
"""
e = get_enforcer()
adapter = e.get_adapter()
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertFalse(e.enforce("alice", "data1", "write"))
self.assertFalse(e.enforce("bob", "data2", "read"))
self.assertTrue(e.enforce("bob", "data2", "write"))
self.assertTrue(e.enforce("alice", "data2", "read"))
self.assertTrue(e.enforce("alice", "data2", "write"))
# test add_policy after insert 2 rules
adapter.add_policy(sec="p", ptype="p", rule=("alice", "data1", "write"))
adapter.add_policy(sec="p", ptype="p", rule=("bob", "data2", "read"))
# reload policies from database
e.load_policy()
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertTrue(e.enforce("alice", "data1", "write"))
self.assertTrue(e.enforce("bob", "data2", "read"))
self.assertTrue(e.enforce("bob", "data2", "write"))
self.assertTrue(e.enforce("alice", "data2", "read"))
self.assertTrue(e.enforce("alice", "data2", "write"))
def test_remove_policy(self):
"""
test remove_policy
"""
e = get_enforcer()
adapter = e.get_adapter()
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertFalse(e.enforce("alice", "data1", "write"))
self.assertFalse(e.enforce("bob", "data2", "read"))
self.assertTrue(e.enforce("bob", "data2", "write"))
self.assertTrue(e.enforce("alice", "data2", "read"))
self.assertTrue(e.enforce("alice", "data2", "write"))
# test remove_policy after delete a role definition
result = adapter.remove_policy(
sec="g", ptype="g", rule=("alice", "data2_admin")
)
# reload policies from database
e.load_policy()
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertFalse(e.enforce("alice", "data1", "write"))
self.assertFalse(e.enforce("bob", "data2", "read"))
self.assertTrue(e.enforce("bob", "data2", "write"))
self.assertFalse(e.enforce("alice", "data2", "read"))
self.assertFalse(e.enforce("alice", "data2", "write"))
self.assertTrue(result)
def test_remove_policy_no_remove_when_rule_is_incomplete(self):
adapter = Adapter("localhost", 6379)
e = casbin.Enforcer(get_fixture("rbac_with_resources_roles.conf"), adapter)
adapter.add_policy(sec="p", ptype="p", rule=("alice", "data1", "write"))
adapter.add_policy(sec="p", ptype="p", rule=("alice", "data1", "read"))
adapter.add_policy(sec="p", ptype="p", rule=("bob", "data2", "read"))
adapter.add_policy(
sec="p", ptype="p", rule=("data_group_admin", "data_group", "write")
)
adapter.add_policy(sec="g", ptype="g", rule=("alice", "data_group_admin"))
adapter.add_policy(sec="g", ptype="g2", rule=("data2", "data_group"))
e.load_policy()
self.assertTrue(e.enforce("alice", "data1", "write"))
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertTrue(e.enforce("bob", "data2", "read"))
self.assertTrue(e.enforce("alice", "data2", "write"))
# test remove_policy doesn't remove when given an incomplete policy
adapter.remove_policy(sec="p", ptype="p", rule=("alice", "data1"))
e.load_policy()
self.assertTrue(e.enforce("alice", "data1", "write"))
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertTrue(e.enforce("bob", "data2", "read"))
self.assertTrue(e.enforce("alice", "data2", "write"))
def test_save_policy(self):
"""
test save_policy
"""
e = get_enforcer()
self.assertFalse(e.enforce("alice", "data4", "read"))
model = e.get_model()
model.clear_policy()
model.add_policy("p", "p", ("alice", "data4", "read"))
adapter = e.get_adapter()
adapter.save_policy(model)
self.assertTrue(e.enforce("alice", "data4", "read"))
def test_remove_filtered_policy(self):
"""
test remove_filtered_policy
"""
e = get_enforcer()
adapter = e.get_adapter()
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertFalse(e.enforce("alice", "data1", "write"))
self.assertFalse(e.enforce("bob", "data2", "read"))
self.assertTrue(e.enforce("bob", "data2", "write"))
self.assertTrue(e.enforce("alice", "data2", "read"))
self.assertTrue(e.enforce("alice", "data2", "write"))
result = adapter.remove_filtered_policy("g", "g", 6, "alice", "data2_admin")
e.load_policy()
self.assertFalse(result)
result = adapter.remove_filtered_policy(
"g", "g", 0, *[f"v{i}" for i in range(7)]
)
e.load_policy()
self.assertFalse(result)
result = adapter.remove_filtered_policy("g", "g", 0, "alice", "data2_admin")
e.load_policy()
self.assertTrue(result)
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertFalse(e.enforce("alice", "data1", "write"))
self.assertFalse(e.enforce("bob", "data2", "read"))
self.assertTrue(e.enforce("bob", "data2", "write"))
self.assertFalse(e.enforce("alice", "data2", "read"))
self.assertFalse(e.enforce("alice", "data2", "write"))
def test_str(self):
"""
test __str__ function
"""
rule = CasbinRule(ptype="p", v0="alice", v1="data1", v2="read")
self.assertEqual(rule.__str__(), "p, alice, data1, read")
def test_dict(self):
"""
test dict function
"""
rule = CasbinRule(ptype="p", v0="alice", v1="data1", v2="read")
self.assertEqual(
rule.dict(), {"ptype": "p", "v0": "alice", "v1": "data1", "v2": "read"}
)
def test_repr(self):
"""
test __repr__ function
"""
rule = CasbinRule(ptype="p", v0="alice", v1="data1", v2="read")
self.assertEqual(repr(rule), '<CasbinRule :"p, alice, data1, read">')