blob: b71989a151277b7e6d6e5568974dc991cc260992 [file] [log] [blame]
import { Watcher } from 'casbin';
import { ClientConfig, Subscription, Message } from '@google-cloud/pubsub';
import { PubsubConnection } from './pubsub';
export class PubsubWatcher implements Watcher {
private topicName: string;
private subscriptionName: string;
private subscription: Subscription;
private pubsubConnection: PubsubConnection;
private callback: () => void;
public static newWatcher(options?: ClientConfig, topicName?: string, subscriptionName?: string): PubsubWatcher {
return new PubsubWatcher(options, topicName, subscriptionName);
}
private constructor(options?: ClientConfig, topicName?: string, subscriptionName?: string) {
this.topicName = topicName || 'casbin';
this.subscriptionName = subscriptionName || 'sub_casbin';
this.pubsubConnection = new PubsubConnection(options);
this.pubsubConnection.open();
this.subscription = this.pubsubConnection.client.topic(this.topicName).subscription(this.subscriptionName);
// Purge all messages, no need to replay as casbin cache will be updated at start up
this.subscription
.seek(new Date())
.then(() => {
this.subscription.on('message', this.messageHandler.bind(this));
})
.catch(() => {});
}
private messageHandler(message: Message): void {
console.log(`Debug: ${this.subscriptionName} got message ID: ${message.id}`);
message.ack();
if (this.callback) {
this.callback();
}
}
public async update(): Promise<boolean> {
await this.pubsubConnection.client.topic(this.topicName).publishMessage({ data: Buffer.from('casbin updated') });
return true;
}
public setUpdateCallback(callback: () => void): void {
this.callback = callback;
}
public async close(): Promise<void> {
this.subscription.removeListener('message', this.messageHandler.bind(this));
await this.subscription.close();
await this.pubsubConnection.close();
}
}