blob: 9dc257874c1cd33ca22a1304926815c3712e1f04 [file] [log] [blame]
# Copyright 2023 The casbin Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from pathlib import Path
from unittest import IsolatedAsyncioTestCase
import casbin
from sqlalchemy import Column, Boolean, Integer, String, select
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from casbin_async_sqlalchemy_adapter import Adapter
from casbin_async_sqlalchemy_adapter import Base
from casbin_async_sqlalchemy_adapter.adapter import Filter
class CasbinRuleSoftDelete(Base):
__tablename__ = "casbin_rule_soft_delete"
id = Column(Integer, primary_key=True)
ptype = Column(String(255))
v0 = Column(String(255))
v1 = Column(String(255))
v2 = Column(String(255))
v3 = Column(String(255))
v4 = Column(String(255))
v5 = Column(String(255))
is_deleted = Column(Boolean, default=False, index=True, nullable=False)
def __str__(self):
arr = [self.ptype]
for v in (self.v0, self.v1, self.v2, self.v3, self.v4, self.v5):
if v is None:
break
arr.append(v)
return ", ".join(arr)
def __repr__(self):
return '<CasbinRule {}: "{}">'.format(self.id, str(self))
async def query_for_rule(session, adapter, ptype, v0, v1, v2):
"""Helper function to query for a specific rule."""
rule_filter = Filter()
rule_filter.ptype = [ptype]
rule_filter.v0 = [v0]
rule_filter.v1 = [v1]
rule_filter.v2 = [v2]
stmt = select(CasbinRuleSoftDelete)
stmt = adapter.filter_query(stmt, rule_filter)
result = await session.execute(stmt)
return result.scalars().first()
def get_fixture(path):
dir_path = os.path.split(os.path.realpath(__file__))[0] + "/"
return os.path.abspath(dir_path + path)
class TestConfigSoftDelete(IsolatedAsyncioTestCase):
async def get_enforcer(self):
engine = create_async_engine("sqlite+aiosqlite://", future=True)
adapter = Adapter(engine, CasbinRuleSoftDelete, CasbinRuleSoftDelete.is_deleted)
await adapter.create_table()
async_session = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
async with async_session() as s:
s.add(CasbinRuleSoftDelete(ptype="p", v0="alice", v1="data1", v2="read"))
s.add(CasbinRuleSoftDelete(ptype="p", v0="bob", v1="data2", v2="write"))
s.add(CasbinRuleSoftDelete(ptype="p", v0="data2_admin", v1="data2", v2="read"))
s.add(CasbinRuleSoftDelete(ptype="p", v0="data2_admin", v1="data2", v2="write"))
s.add(CasbinRuleSoftDelete(ptype="g", v0="alice", v1="data2_admin"))
await s.commit()
scriptdir = Path(os.path.dirname(os.path.realpath(__file__)))
model_path = scriptdir / "rbac_model.conf"
e = casbin.AsyncEnforcer(str(model_path), adapter)
await e.load_policy()
return e
async def test_custom_db_class(self):
"""Test that custom database class with softdelete works."""
class CustomRule(Base):
__tablename__ = "casbin_rule3"
__table_args__ = {"extend_existing": True}
id = Column(Integer, primary_key=True)
ptype = Column(String(255))
v0 = Column(String(255))
v1 = Column(String(255))
v2 = Column(String(255))
v3 = Column(String(255))
v4 = Column(String(255))
v5 = Column(String(255))
is_deleted = Column(Boolean, default=False)
not_exist = Column(String(255))
engine = create_async_engine("sqlite+aiosqlite://", future=True)
adapter = Adapter(engine, CustomRule, CustomRule.is_deleted)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
session = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
async with session() as s:
s.add(CustomRule(not_exist="NotNone"))
await s.commit()
a = await s.execute(select(CustomRule))
self.assertEqual(a.scalars().all()[0].not_exist, "NotNone")
async def test_softdelete_flag(self):
"""Test that softdelete flag is set correctly when removing policies."""
e = await self.get_enforcer()
session_maker = e.adapter.session_local
async with session_maker() as session:
# Verify rule does not exist initially
self.assertFalse(e.enforce("alice", "data5", "read"))
rule = await query_for_rule(session, e.adapter, "p", "alice", "data5", "read")
self.assertIsNone(rule)
# Add new permission
await e.add_permission_for_user("alice", "data5", "read")
self.assertTrue(e.enforce("alice", "data5", "read"))
async with session_maker() as session:
rule = await query_for_rule(session, e.adapter, "p", "alice", "data5", "read")
self.assertIsNotNone(rule)
self.assertFalse(rule.is_deleted)
# Delete permission - should soft delete
await e.delete_permission_for_user("alice", "data5", "read")
self.assertFalse(e.enforce("alice", "data5", "read"))
async with session_maker() as session:
rule = await query_for_rule(session, e.adapter, "p", "alice", "data5", "read")
self.assertIsNotNone(rule)
self.assertTrue(rule.is_deleted)
async def test_save_policy_softdelete(self):
"""Test that save_policy correctly marks rules as deleted."""
e = await self.get_enforcer()
session_maker = e.adapter.session_local
# Turn off auto save
e.enable_auto_save(auto_save=False)
# Delete some preexisting rules using model's internal methods
e.get_model().remove_policy("p", "p", ["alice", "data1", "read"])
e.get_model().remove_policy("p", "p", ["bob", "data2", "write"])
# Delete a non existing rule (won't do anything in model)
e.get_model().remove_policy("p", "p", ["bob", "data100", "read"])
# Add some new rules using model's internal methods
e.get_model().add_policy("p", "p", ["alice", "data100", "read"])
e.get_model().add_policy("p", "p", ["bob", "data100", "write"])
# Write changes to database
await e.save_policy()
async with session_maker() as session:
# Check deleted rules are marked as deleted
rule1 = await query_for_rule(session, e.adapter, "p", "alice", "data1", "read")
self.assertTrue(rule1.is_deleted)
rule2 = await query_for_rule(session, e.adapter, "p", "bob", "data2", "write")
self.assertTrue(rule2.is_deleted)
# Non-existent rule should not be in DB
rule3 = await query_for_rule(session, e.adapter, "p", "bob", "data100", "read")
self.assertIsNone(rule3)
# New rules should not be deleted
rule4 = await query_for_rule(session, e.adapter, "p", "alice", "data100", "read")
self.assertIsNotNone(rule4)
self.assertFalse(rule4.is_deleted)
rule5 = await query_for_rule(session, e.adapter, "p", "bob", "data100", "write")
self.assertIsNotNone(rule5)
self.assertFalse(rule5.is_deleted)
async def test_softdelete_type_validation(self):
"""Test that non-Boolean softdelete attribute raises ValueError."""
class InvalidRule(Base):
__tablename__ = "invalid_rule"
id = Column(Integer, primary_key=True)
ptype = Column(String(255))
v0 = Column(String(255))
v1 = Column(String(255))
v2 = Column(String(255))
v3 = Column(String(255))
v4 = Column(String(255))
v5 = Column(String(255))
is_deleted = Column(String(255)) # Wrong type!
engine = create_async_engine("sqlite+aiosqlite://", future=True)
with self.assertRaises(ValueError) as context:
Adapter(engine, InvalidRule, InvalidRule.is_deleted)
self.assertIn("Boolean", str(context.exception))
async def test_remove_policies_with_softdelete(self):
"""Test that remove_policies correctly soft-deletes multiple rules."""
e = await self.get_enforcer()
session_maker = e.adapter.session_local
# Add multiple policies
await e.add_policies([["alice", "data10", "read"], ["bob", "data10", "write"], ["carol", "data10", "read"]])
# Verify they exist
self.assertTrue(e.enforce("alice", "data10", "read"))
self.assertTrue(e.enforce("bob", "data10", "write"))
self.assertTrue(e.enforce("carol", "data10", "read"))
# Remove multiple policies
await e.remove_policies([["alice", "data10", "read"], ["bob", "data10", "write"]])
# Verify they are soft-deleted
self.assertFalse(e.enforce("alice", "data10", "read"))
self.assertFalse(e.enforce("bob", "data10", "write"))
self.assertTrue(e.enforce("carol", "data10", "read"))
async with session_maker() as session:
rule1 = await query_for_rule(session, e.adapter, "p", "alice", "data10", "read")
self.assertIsNotNone(rule1)
self.assertTrue(rule1.is_deleted)
rule2 = await query_for_rule(session, e.adapter, "p", "bob", "data10", "write")
self.assertIsNotNone(rule2)
self.assertTrue(rule2.is_deleted)
rule3 = await query_for_rule(session, e.adapter, "p", "carol", "data10", "read")
self.assertIsNotNone(rule3)
self.assertFalse(rule3.is_deleted)
async def test_remove_filtered_policy_with_softdelete(self):
"""Test that remove_filtered_policy correctly soft-deletes matching rules."""
e = await self.get_enforcer()
session_maker = e.adapter.session_local
# Initial state verification
self.assertTrue(e.enforce("alice", "data2", "read"))
self.assertTrue(e.enforce("data2_admin", "data2", "read"))
# Remove all policies for data2 (field_index=1, value="data2")
await e.remove_filtered_policy(1, "data2")
# Verify policies are removed from enforcer
self.assertFalse(e.enforce("alice", "data2", "read"))
self.assertFalse(e.enforce("data2_admin", "data2", "read"))
async with session_maker() as session:
# All data2 policies should be soft-deleted
rule1 = await query_for_rule(session, e.adapter, "p", "data2_admin", "data2", "read")
self.assertIsNotNone(rule1)
self.assertTrue(rule1.is_deleted)
rule2 = await query_for_rule(session, e.adapter, "p", "data2_admin", "data2", "write")
self.assertIsNotNone(rule2)
self.assertTrue(rule2.is_deleted)
async def test_update_policy_with_softdelete(self):
"""Test that update_policy works correctly with soft delete."""
e = await self.get_enforcer()
session_maker = e.adapter.session_local
# Verify initial policy
self.assertTrue(e.enforce("alice", "data1", "read"))
self.assertFalse(e.enforce("alice", "data1", "write"))
# Update policy
await e.update_policy(["alice", "data1", "read"], ["alice", "data1", "write"])
# Verify updated policy
self.assertFalse(e.enforce("alice", "data1", "read"))
self.assertTrue(e.enforce("alice", "data1", "write"))
async with session_maker() as session:
# The updated rule should not be deleted
rule = await query_for_rule(session, e.adapter, "p", "alice", "data1", "write")
self.assertIsNotNone(rule)
self.assertFalse(rule.is_deleted)
async def test_load_policy_ignores_soft_deleted(self):
"""Test that load_policy ignores soft-deleted rules."""
e = await self.get_enforcer()
session_maker = e.adapter.session_local
# Delete a policy
await e.delete_permission_for_user("alice", "data1", "read")
async with session_maker() as session:
rule = await query_for_rule(session, e.adapter, "p", "alice", "data1", "read")
self.assertIsNotNone(rule)
self.assertTrue(rule.is_deleted)
# Create a new enforcer and load policy
scriptdir = Path(os.path.dirname(os.path.realpath(__file__)))
model_path = scriptdir / "rbac_model.conf"
e2 = casbin.AsyncEnforcer(str(model_path), e.adapter)
await e2.load_policy()
# The soft-deleted policy should not be loaded
self.assertFalse(e2.enforce("alice", "data1", "read"))
# Other policies should still be loaded
self.assertTrue(e2.enforce("bob", "data2", "write"))
async def test_load_filtered_policy_ignores_soft_deleted(self):
"""Test that load_filtered_policy ignores soft-deleted rules."""
e = await self.get_enforcer()
# Delete a policy
await e.delete_permission_for_user("bob", "data2", "write")
# Create filter for data2
filter = Filter()
filter.v1 = ["data2"]
# Create new enforcer with filtered policy
scriptdir = Path(os.path.dirname(os.path.realpath(__file__)))
model_path = scriptdir / "rbac_model.conf"
e2 = casbin.AsyncEnforcer(str(model_path), e.adapter)
await e2.load_filtered_policy(filter)
# Soft-deleted policy should not be loaded
self.assertFalse(e2.enforce("bob", "data2", "write"))
# Other data2 policies should be loaded
self.assertTrue(e2.enforce("data2_admin", "data2", "read"))
async def test_clear_policy_with_softdelete(self):
"""Test that clear_policy() marks all records as deleted when softdelete is enabled."""
e = await self.get_enforcer()
adapter = e.get_adapter()
engine = adapter._engine
# Verify there are policies in the database
async_session = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
async with async_session() as s:
# Count total records (including soft-deleted)
total_result = await s.execute(select(CasbinRuleSoftDelete))
total_count = len(total_result.scalars().all())
self.assertGreater(total_count, 0, "There should be policies in the database before clearing")
# Count non-deleted records
active_result = await s.execute(select(CasbinRuleSoftDelete).where(CasbinRuleSoftDelete.is_deleted == False))
active_count = len(active_result.scalars().all())
self.assertGreater(active_count, 0, "There should be active policies before clearing")
# Clear all policies (soft delete)
await adapter.clear_policy()
# Verify all active policies are now marked as deleted
async with async_session() as s:
# Total count should remain the same (soft delete)
total_result = await s.execute(select(CasbinRuleSoftDelete))
total_after = len(total_result.scalars().all())
self.assertEqual(total_count, total_after, "Total records should remain the same with soft delete")
# Active count should be 0
active_result = await s.execute(select(CasbinRuleSoftDelete).where(CasbinRuleSoftDelete.is_deleted == False))
active_after = len(active_result.scalars().all())
self.assertEqual(active_after, 0, "All policies should be marked as deleted")
# All should be marked as deleted
deleted_result = await s.execute(select(CasbinRuleSoftDelete).where(CasbinRuleSoftDelete.is_deleted == True))
deleted_after = len(deleted_result.scalars().all())
self.assertEqual(deleted_after, total_count, "All policies should be marked as deleted")
# Verify enforcer still works after clearing (can load empty policy)
await e.load_policy()
self.assertFalse(e.enforce("alice", "data1", "read"))
# Verify we can add policies after clearing
await e.add_policy("eve", "data3", "read")
self.assertTrue(e.enforce("eve", "data3", "read"))