| import { Subscriber } from '../Subscriber'; |
| export function refCount() { |
| return function refCountOperatorFunction(source) { |
| return source.lift(new RefCountOperator(source)); |
| }; |
| } |
| class RefCountOperator { |
| constructor(connectable) { |
| this.connectable = connectable; |
| } |
| call(subscriber, source) { |
| const { connectable } = this; |
| connectable._refCount++; |
| const refCounter = new RefCountSubscriber(subscriber, connectable); |
| const subscription = source.subscribe(refCounter); |
| if (!refCounter.closed) { |
| refCounter.connection = connectable.connect(); |
| } |
| return subscription; |
| } |
| } |
| class RefCountSubscriber extends Subscriber { |
| constructor(destination, connectable) { |
| super(destination); |
| this.connectable = connectable; |
| } |
| _unsubscribe() { |
| const { connectable } = this; |
| if (!connectable) { |
| this.connection = null; |
| return; |
| } |
| this.connectable = null; |
| const refCount = connectable._refCount; |
| if (refCount <= 0) { |
| this.connection = null; |
| return; |
| } |
| connectable._refCount = refCount - 1; |
| if (refCount > 1) { |
| this.connection = null; |
| return; |
| } |
| /// |
| // Compare the local RefCountSubscriber's connection Subscription to the |
| // connection Subscription on the shared ConnectableObservable. In cases |
| // where the ConnectableObservable source synchronously emits values, and |
| // the RefCountSubscriber's downstream Observers synchronously unsubscribe, |
| // execution continues to here before the RefCountOperator has a chance to |
| // supply the RefCountSubscriber with the shared connection Subscription. |
| // For example: |
| // ``` |
| // Observable.range(0, 10) |
| // .publish() |
| // .refCount() |
| // .take(5) |
| // .subscribe(); |
| // ``` |
| // In order to account for this case, RefCountSubscriber should only dispose |
| // the ConnectableObservable's shared connection Subscription if the |
| // connection Subscription exists, *and* either: |
| // a. RefCountSubscriber doesn't have a reference to the shared connection |
| // Subscription yet, or, |
| // b. RefCountSubscriber's connection Subscription reference is identical |
| // to the shared connection Subscription |
| /// |
| const { connection } = this; |
| const sharedConnection = connectable._connection; |
| this.connection = null; |
| if (sharedConnection && (!connection || sharedConnection === connection)) { |
| sharedConnection.unsubscribe(); |
| } |
| } |
| } |
| //# sourceMappingURL=refCount.js.map |