RabbitMQ watcher for jCasbin, enabling distributed policy synchronization across multiple instances using RabbitMQ as the message broker.
<dependency> <groupId>org.casbin</groupId> <artifactId>jcasbin-rabbitmq-watcher</artifactId> <version>1.0.0</version> </dependency>
implementation 'org.casbin:jcasbin-rabbitmq-watcher:1.0.0'
import org.casbin.jcasbin.main.Enforcer; import org.casbin.watcher.RabbitMQWatcher; // Create enforcer Enforcer enforcer = new Enforcer("path/to/model.conf", "path/to/policy.csv"); // Create and set watcher RabbitMQWatcher watcher = new RabbitMQWatcher("amqp://guest:guest@localhost:5672"); enforcer.setWatcher(watcher); // The watcher will automatically reload policy when updates are received watcher.setUpdateCallback(() -> { enforcer.loadPolicy(); }); // When you update policy, notify other instances enforcer.addPolicy("alice", "data1", "read"); watcher.update(); // Close watcher when done watcher.close();
RabbitMQWatcher watcher = new RabbitMQWatcher("localhost", 5672);
RabbitMQWatcher watcher = new RabbitMQWatcher( "amqp://guest:guest@localhost:5672", "my_custom_exchange" );
For more detailed examples, see RabbitMQWatcherDemo.java which includes:
RabbitMQWatcher instance creates a unique instance ID (UUID)update() is called, publishes a message containing the instance ID┌─────────────┐ ┌──────────────────┐ ┌─────────────┐
│ Instance 1 │────────>│ RabbitMQ Fanout │────────>│ Instance 2 │
│ (Enforcer) │ update │ Exchange │ message │ (Enforcer) │
└─────────────┘ └──────────────────┘ └─────────────┘
^ │
│ │
└───────────────────── loadPolicy ─────────────────────┘
Each instance:
The watcher supports multiple ways to connect to RabbitMQ:
RabbitMQWatcher watcher = new RabbitMQWatcher("amqp://user:pass@host:5672/vhost");
RabbitMQWatcher watcher = new RabbitMQWatcher("localhost", 5672);
By default, the watcher uses the exchange name casbin_policy_updates. You can customize this:
RabbitMQWatcher watcher = new RabbitMQWatcher("amqp://localhost:5672", "my_exchange");
This project includes comprehensive integration tests using Testcontainers, which automatically starts a RabbitMQ container for testing.
mvn test
Note: Docker must be running for the tests to execute.
The test suite includes:
// Instance 1 Enforcer enforcer1 = new Enforcer("model.conf", "policy.csv"); RabbitMQWatcher watcher1 = new RabbitMQWatcher("amqp://localhost:5672"); enforcer1.setWatcher(watcher1); watcher1.setUpdateCallback(() -> enforcer1.loadPolicy()); // Instance 2 Enforcer enforcer2 = new Enforcer("model.conf", "policy.csv"); RabbitMQWatcher watcher2 = new RabbitMQWatcher("amqp://localhost:5672"); enforcer2.setWatcher(watcher2); watcher2.setUpdateCallback(() -> enforcer2.loadPolicy()); // Update policy in instance 1 enforcer1.addPolicy("alice", "data2", "write"); watcher1.update(); // This will trigger enforcer2 to reload // Instance 2 now has the updated policy boolean result = enforcer2.enforce("alice", "data2", "write"); // true
The watcher uses SLF4J for logging. Configure your logging framework (e.g., Logback, Log4j2) to control log output.
Example Logback configuration:
<configuration> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender> <logger name="org.casbin.watcher" level="INFO"/> <root level="INFO"> <appender-ref ref="STDOUT" /> </root> </configuration>
RabbitMQWatcher(String host, int port) - Connect using host and portRabbitMQWatcher(String host, int port, String exchangeName) - Connect with custom exchangeRabbitMQWatcher(String uri) - Connect using AMQP URIRabbitMQWatcher(String uri, String exchangeName) - Connect using URI with custom exchangevoid setUpdateCallback(Runnable callback) - Set callback for policy updatesvoid update() - Broadcast policy update to other instancesvoid close() - Close connection and release resourcesString getInstanceId() - Get unique instance identifierboolean isRunning() - Check if watcher is runningContributions are welcome! Please feel free to submit a Pull Request.
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.