blob: 120ce3447172a07d71c5aa78fab9e56fdcbaac49 [file] [log] [blame]
# Copyright 2021 The casbin Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from casbin.model.policy_op import PolicyOp
class Assertion:
def __init__(self):
self.logger = logging.getLogger("casbin.policy")
self.key = ""
self.value = ""
self.tokens = []
self.params_tokens = []
self.policy = []
self.rm = None
self.cond_rm = None
self.priority_index: int = -1
self.policy_map: dict = {}
self.field_index_map: dict = {}
def build_role_links(self, rm):
self.rm = rm
count = self.value.count("_")
if count < 2:
raise RuntimeError('the number of "_" in role definition should be at least 2')
for rule in self.policy:
if len(rule) < count:
raise RuntimeError("grouping policy elements do not meet role definition")
if len(rule) > count:
rule = rule[:count]
self.rm.add_link(*rule[:count])
self.logger.info("Role links for: {}".format(self.key))
self.rm.print_roles()
def build_incremental_role_links(self, rm, op, rules):
self.rm = rm
count = self.value.count("_")
if count < 2:
raise RuntimeError('the number of "_" in role definition should be at least 2')
for rule in rules:
if len(rule) < count:
raise TypeError("grouping policy elements do not meet role definition")
if len(rule) > count:
rule = rule[:count]
if op == PolicyOp.Policy_add:
rm.add_link(rule[0], rule[1], *rule[2:])
elif op == PolicyOp.Policy_remove:
rm.delete_link(rule[0], rule[1], *rule[2:])
else:
raise TypeError("Invalid operation: " + str(op))
def build_incremental_conditional_role_links(self, cond_rm, op, rules):
self.cond_rm = cond_rm
count = self.value.count("_")
if count < 2:
raise RuntimeError('the number of "_" in role definition should be at least 2')
for rule in rules:
if len(rule) < count:
raise TypeError("grouping policy elements do not meet role definition")
if len(rule) > count:
rule = rule[:count]
domain_rule = rule[2 : len(self.tokens)]
if op == PolicyOp.Policy_add:
self.add_conditional_role_link(rule, domain_rule)
elif op == PolicyOp.Policy_remove:
self.cond_rm.delete_link(rule[0], rule[1], *rule[2:])
else:
raise TypeError("Invalid operation: " + str(op))
def build_conditional_role_links(self, cond_rm):
self.cond_rm = cond_rm
count = self.value.count("_")
if count < 2:
raise RuntimeError('the number of "_" in role definition should be at least 2')
for rule in self.policy:
if len(rule) < count:
raise TypeError("grouping policy elements do not meet role definition")
if len(rule) > count:
rule = rule[:count]
domain_rule = rule[2 : len(self.tokens)]
self.add_conditional_role_link(rule, domain_rule)
def add_conditional_role_link(self, rule, domain_rule):
if not domain_rule:
self.cond_rm.add_link(rule[0], rule[1])
self.cond_rm.set_link_condition_func_params(rule[0], rule[1], *rule[len(self.tokens) :])
else:
domain = domain_rule[0]
self.cond_rm.add_link(rule[0], rule[1], domain)
self.cond_rm.set_domain_link_condition_func_params(rule[0], rule[1], domain, *rule[len(self.tokens) :])