blob: 69f2fd1bf31f2a7edbaa6dcc8d911968917880a0 [file] [log] [blame]
# Copyright 2022 The casbin Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
from unittest import TestCase
import casbin
import redis
from redis_watcher import WatcherOptions, new_watcher, new_publish_watcher
def get_examples(path):
examples_path = os.path.split(os.path.realpath(__file__))[0] + "/../examples/"
return os.path.abspath(examples_path + path)
class TestConfig(TestCase):
def test_watcher_init(self):
test_option = WatcherOptions()
test_option.host = "localhost"
test_option.port = "6379"
test_option.optional_update_callback = lambda event: print("update callback, event: {}".format(event))
w = new_watcher(test_option)
assert isinstance(w.sub_client, redis.client.PubSub)
assert isinstance(w.pub_client, redis.client.Redis)
def test_publish_watcher_init(self):
test_option = WatcherOptions()
test_option.host = "localhost"
test_option.port = "6379"
test_option.optional_update_callback = lambda event: print("update callback, event: {}".format(event))
w = new_publish_watcher(test_option)
assert w.sub_client is None
assert isinstance(w.pub_client, redis.client.Redis)
def test_watcher_init_without_callback(self):
test_option = WatcherOptions()
test_option.host = "localhost"
test_option.port = "6379"
w = new_watcher(test_option)
assert isinstance(w.sub_client, redis.client.PubSub)
assert isinstance(w.pub_client, redis.client.Redis)
def test_unsubscribe(self):
callback_flag = False
def callback_function(event):
nonlocal callback_flag
callback_flag = True
print("callback_function, event: {}".format(event))
test_option = WatcherOptions()
test_option.host = "localhost"
test_option.port = "6379"
test_option.channel = "test"
test_option.optional_update_callback = callback_function
w = new_watcher(test_option)
assert callback_flag is False
w.pub_client.publish("test", "test_value")
time.sleep(0.5)
assert callback_flag is True
w.unsubscribe(w.sub_client)
time.sleep(0.5)
callback_flag = False
w.pub_client.publish("test", "test_value")
time.sleep(0.5)
assert callback_flag is False
def test_call_back_function(self):
callback_flag = False
def callback_function(event):
nonlocal callback_flag
callback_flag = True
print("callback_function, event: {}".format(event))
test_option = WatcherOptions()
test_option.host = "localhost"
test_option.port = "6379"
test_option.channel = "test"
test_option.optional_update_callback = callback_function
w = new_watcher(test_option)
assert callback_flag is False
w.pub_client.publish("test", "test_value")
time.sleep(0.5)
assert callback_flag is True
def test_update(self):
callback_flag = False
def callback_function(event):
nonlocal callback_flag
callback_flag = True
print("update callback, event: {}".format(event))
test_option = WatcherOptions()
test_option.host = "localhost"
test_option.port = "6379"
test_option.channel = "test"
test_option.optional_update_callback = callback_function
w = new_watcher(test_option)
assert callback_flag is False
w.update()
time.sleep(0.5)
assert callback_flag is True
def test_update_for_add_policy(self):
callback_flag = False
def callback_function(event):
nonlocal callback_flag
callback_flag = True
print("update for add policy, event: {}".format(event))
test_option = WatcherOptions()
test_option.host = "localhost"
test_option.port = "6379"
test_option.channel = "test"
test_option.optional_update_callback = callback_function
w = new_watcher(test_option)
assert callback_flag is False
w.update_for_add_policy("test1", "test2", "test3")
time.sleep(0.5)
assert callback_flag is True
def test_update_for_remove_policy(self):
callback_flag = False
def callback_function(event):
nonlocal callback_flag
callback_flag = True
print("update for remove policy callback, event: {}".format(event))
test_option = WatcherOptions()
test_option.host = "localhost"
test_option.port = "6379"
test_option.channel = "test"
test_option.optional_update_callback = callback_function
w = new_watcher(test_option)
assert callback_flag is False
w.update_for_remove_policy("test1", "test2", "test3")
time.sleep(0.5)
assert callback_flag is True
def test_update_for_remove_filtered_policy(self):
callback_flag = False
def callback_function(event):
nonlocal callback_flag
callback_flag = True
print("update for remove filtered policy callback, event: {}".format(event))
test_option = WatcherOptions()
test_option.host = "localhost"
test_option.port = "6379"
test_option.channel = "test"
test_option.optional_update_callback = callback_function
w = new_watcher(test_option)
assert callback_flag is False
w.update_for_remove_filtered_policy("test1", "test2", "test3")
time.sleep(0.5)
assert callback_flag is True
def test_with_enforcer(self):
callback_flag = False
def callback_function(event):
nonlocal callback_flag
callback_flag = True
print("update for remove filtered policy callback, event: {}".format(event))
test_option = WatcherOptions()
test_option.host = "localhost"
test_option.port = "6379"
test_option.channel = "test"
test_option.optional_update_callback = callback_function
w = new_watcher(test_option)
e = casbin.Enforcer(get_examples("rbac_model.conf"), get_examples("rbac_policy.csv"))
e.set_watcher(w)
assert callback_flag is False
e.save_policy()
time.sleep(0.5)
assert callback_flag is True
# related update function not be called in py-casbin yet
e.add_policy("eve", "data3", "read")
e.remove_policy("eve", "data3", "read")
rules = [
["jack", "data4", "read"],
["katy", "data4", "write"],
["leyo", "data4", "read"],
["ham", "data4", "write"],
]
e.add_policies(rules)
e.remove_policies(rules)