Casbin Informer Watcher is a Kubernetes informer-based watcher for Casbin. This watcher enables real-time policy synchronization across multiple Casbin enforcer instances by watching Kubernetes Custom Resource Definitions (CRDs).
SyncedEnforcer in multi-threaded environmentsgo get github.com/casbin/casbin-informer-watcher
package main import ( "log" "github.com/casbin/casbin/v2" informerwatcher "github.com/casbin/casbin-informer-watcher" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/clientcmd" ) func main() { // Load Kubernetes configuration config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) if err != nil { log.Fatalf("Failed to load kubeconfig: %v", err) } // Create dynamic client client, err := dynamic.NewForConfig(config) if err != nil { log.Fatalf("Failed to create dynamic client: %v", err) } // Define the GVR for your policy CRD gvr := schema.GroupVersionResource{ Group: "casbin.org", Version: "v1", Resource: "policies", } // Create the watcher watcher, err := informerwatcher.NewWatcher(client, gvr, "default", informerwatcher.WatcherOptions{}) if err != nil { log.Fatalf("Failed to create watcher: %v", err) } defer watcher.Close() // Initialize the enforcer e, err := casbin.NewEnforcer("examples/rbac_model.conf", "examples/rbac_policy.csv") if err != nil { log.Fatalf("Failed to create enforcer: %v", err) } // Set the watcher for the enforcer err = e.SetWatcher(watcher) if err != nil { log.Fatalf("Failed to set watcher: %v", err) } // By default, the watcher's callback is automatically set to the // enforcer's LoadPolicy() in the SetWatcher() call. // You can change it by explicitly setting a callback. err = watcher.SetUpdateCallback(informerwatcher.DefaultUpdateCallback(e)) if err != nil { log.Fatalf("Failed to set callback: %v", err) } log.Println("Watcher is running and monitoring policy changes...") select {} // Keep the program running }
package main import ( "log" "time" "github.com/casbin/casbin/v2" informerwatcher "github.com/casbin/casbin-informer-watcher" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/clientcmd" ) func main() { // Load Kubernetes configuration config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) if err != nil { log.Fatalf("Failed to load kubeconfig: %v", err) } // Create dynamic client client, err := dynamic.NewForConfig(config) if err != nil { log.Fatalf("Failed to create dynamic client: %v", err) } // Define the GVR for your policy CRD gvr := schema.GroupVersionResource{ Group: "casbin.org", Version: "v1", Resource: "policies", } // Create watcher with custom options options := informerwatcher.WatcherOptions{ LocalID: "instance-1", // Custom instance identifier IgnoreSelf: true, // Ignore updates from this instance ResyncPeriod: 30 * time.Second, // Resync period with API server } watcher, err := informerwatcher.NewWatcher(client, gvr, "default", options) if err != nil { log.Fatalf("Failed to create watcher: %v", err) } defer watcher.Close() // Initialize the enforcer e, err := casbin.NewEnforcer("examples/rbac_model.conf", "examples/rbac_policy.csv") if err != nil { log.Fatalf("Failed to create enforcer: %v", err) } // Set the watcher err = e.SetWatcher(watcher) if err != nil { log.Fatalf("Failed to set watcher: %v", err) } // Custom callback that logs updates customCallback := func(msg string) { log.Printf("Policy update received: %s\n", msg) informerwatcher.DefaultUpdateCallback(e)(msg) } err = watcher.SetUpdateCallback(customCallback) if err != nil { log.Fatalf("Failed to set callback: %v", err) } log.Println("Watcher is running with custom configuration...") select {} // Keep the program running }
package main import ( "log" "github.com/casbin/casbin/v2" informerwatcher "github.com/casbin/casbin-informer-watcher" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/clientcmd" ) func main() { // Setup client and GVR (same as basic example) config, _ := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) client, _ := dynamic.NewForConfig(config) gvr := schema.GroupVersionResource{ Group: "casbin.org", Version: "v1", Resource: "policies", } watcher, _ := informerwatcher.NewWatcher(client, gvr, "default", informerwatcher.WatcherOptions{}) defer watcher.Close() // Use SyncedEnforcer for concurrency-safe operations e, err := casbin.NewSyncedEnforcer("examples/rbac_model.conf", "examples/rbac_policy.csv") if err != nil { log.Fatalf("Failed to create synced enforcer: %v", err) } err = e.SetWatcher(watcher) if err != nil { log.Fatalf("Failed to set watcher: %v", err) } log.Println("SyncedEnforcer is running with watcher...") select {} }
The watcher supports all standard Casbin policy update operations:
Update: Full policy reloadUpdateForAddPolicy: Add a single policy ruleUpdateForRemovePolicy: Remove a single policy ruleUpdateForAddPolicies: Add multiple policy rulesUpdateForRemovePolicies: Remove multiple policy rulesUpdateForRemoveFilteredPolicy: Remove filtered policy rulesUpdateForUpdatePolicy: Update a single policy ruleUpdateForUpdatePolicies: Update multiple policy rulesUpdateForSavePolicy: Save policy to storageYou‘ll need to define a CRD for your Casbin policies. Here’s a basic example:
apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: name: policies.casbin.org spec: group: casbin.org versions: - name: v1 served: true storage: true schema: openAPIV3Schema: type: object properties: spec: type: object properties: policy: type: string scope: Namespaced names: plural: policies singular: policy kind: Policy
Run the test suite:
go test -v ./...
Run tests with coverage:
go test -v -coverprofile=coverage.out ./... go tool cover -html=coverage.out
Contributions are welcome! Please feel free to submit a Pull Request.
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.