| import { innerFrom } from '../observable/innerFrom'; |
| import { Subject } from '../Subject'; |
| import { SafeSubscriber } from '../Subscriber'; |
| import { Subscription } from '../Subscription'; |
| import { MonoTypeOperatorFunction, SubjectLike, ObservableInput } from '../types'; |
| import { operate } from '../util/lift'; |
| |
| export interface ShareConfig<T> { |
| /** |
| * The factory used to create the subject that will connect the source observable to |
| * multicast consumers. |
| */ |
| connector?: () => SubjectLike<T>; |
| /** |
| * If `true`, the resulting observable will reset internal state on error from source and return to a "cold" state. This |
| * allows the resulting observable to be "retried" in the event of an error. |
| * If `false`, when an error comes from the source it will push the error into the connecting subject, and the subject |
| * will remain the connecting subject, meaning the resulting observable will not go "cold" again, and subsequent retries |
| * or resubscriptions will resubscribe to that same subject. In all cases, RxJS subjects will emit the same error again, however |
| * {@link ReplaySubject} will also push its buffered values before pushing the error. |
| * It is also possible to pass a notifier factory returning an `ObservableInput` instead which grants more fine-grained |
| * control over how and when the reset should happen. This allows behaviors like conditional or delayed resets. |
| */ |
| resetOnError?: boolean | ((error: any) => ObservableInput<any>); |
| /** |
| * If `true`, the resulting observable will reset internal state on completion from source and return to a "cold" state. This |
| * allows the resulting observable to be "repeated" after it is done. |
| * If `false`, when the source completes, it will push the completion through the connecting subject, and the subject |
| * will remain the connecting subject, meaning the resulting observable will not go "cold" again, and subsequent repeats |
| * or resubscriptions will resubscribe to that same subject. |
| * It is also possible to pass a notifier factory returning an `ObservableInput` instead which grants more fine-grained |
| * control over how and when the reset should happen. This allows behaviors like conditional or delayed resets. |
| */ |
| resetOnComplete?: boolean | (() => ObservableInput<any>); |
| /** |
| * If `true`, when the number of subscribers to the resulting observable reaches zero due to those subscribers unsubscribing, the |
| * internal state will be reset and the resulting observable will return to a "cold" state. This means that the next |
| * time the resulting observable is subscribed to, a new subject will be created and the source will be subscribed to |
| * again. |
| * If `false`, when the number of subscribers to the resulting observable reaches zero due to unsubscription, the subject |
| * will remain connected to the source, and new subscriptions to the result will be connected through that same subject. |
| * It is also possible to pass a notifier factory returning an `ObservableInput` instead which grants more fine-grained |
| * control over how and when the reset should happen. This allows behaviors like conditional or delayed resets. |
| */ |
| resetOnRefCountZero?: boolean | (() => ObservableInput<any>); |
| } |
| |
| export function share<T>(): MonoTypeOperatorFunction<T>; |
| |
| export function share<T>(options: ShareConfig<T>): MonoTypeOperatorFunction<T>; |
| |
| /** |
| * Returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one |
| * Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will |
| * unsubscribe from the source Observable. Because the Observable is multicasting it makes the stream `hot`. |
| * This is an alias for `multicast(() => new Subject()), refCount()`. |
| * |
| * The subscription to the underlying source Observable can be reset (unsubscribe and resubscribe for new subscribers), |
| * if the subscriber count to the shared observable drops to 0, or if the source Observable errors or completes. It is |
| * possible to use notifier factories for the resets to allow for behaviors like conditional or delayed resets. Please |
| * note that resetting on error or complete of the source Observable does not behave like a transparent retry or restart |
| * of the source because the error or complete will be forwarded to all subscribers and their subscription will be |
| * closed. Only new subscribers after a reset on error or complete happened will cause a fresh subscription to the |
| * source. To achieve transparent retries or restarts pipe the source through appropriate operators before sharing. |
| * |
| *  |
| * |
| * ## Example |
| * |
| * Generate new multicast Observable from the `source` Observable value |
| * |
| * ```ts |
| * import { interval, tap, map, take, share } from 'rxjs'; |
| * |
| * const source = interval(1000).pipe( |
| * tap(x => console.log('Processing: ', x)), |
| * map(x => x * x), |
| * take(6), |
| * share() |
| * ); |
| * |
| * source.subscribe(x => console.log('subscription 1: ', x)); |
| * source.subscribe(x => console.log('subscription 2: ', x)); |
| * |
| * // Logs: |
| * // Processing: 0 |
| * // subscription 1: 0 |
| * // subscription 2: 0 |
| * // Processing: 1 |
| * // subscription 1: 1 |
| * // subscription 2: 1 |
| * // Processing: 2 |
| * // subscription 1: 4 |
| * // subscription 2: 4 |
| * // Processing: 3 |
| * // subscription 1: 9 |
| * // subscription 2: 9 |
| * // Processing: 4 |
| * // subscription 1: 16 |
| * // subscription 2: 16 |
| * // Processing: 5 |
| * // subscription 1: 25 |
| * // subscription 2: 25 |
| * ``` |
| * |
| * ## Example with notifier factory: Delayed reset |
| * |
| * ```ts |
| * import { interval, take, share, timer } from 'rxjs'; |
| * |
| * const source = interval(1000).pipe( |
| * take(3), |
| * share({ |
| * resetOnRefCountZero: () => timer(1000) |
| * }) |
| * ); |
| * |
| * const subscriptionOne = source.subscribe(x => console.log('subscription 1: ', x)); |
| * setTimeout(() => subscriptionOne.unsubscribe(), 1300); |
| * |
| * setTimeout(() => source.subscribe(x => console.log('subscription 2: ', x)), 1700); |
| * |
| * setTimeout(() => source.subscribe(x => console.log('subscription 3: ', x)), 5000); |
| * |
| * // Logs: |
| * // subscription 1: 0 |
| * // (subscription 1 unsubscribes here) |
| * // (subscription 2 subscribes here ~400ms later, source was not reset) |
| * // subscription 2: 1 |
| * // subscription 2: 2 |
| * // (subscription 2 unsubscribes here) |
| * // (subscription 3 subscribes here ~2000ms later, source did reset before) |
| * // subscription 3: 0 |
| * // subscription 3: 1 |
| * // subscription 3: 2 |
| * ``` |
| * |
| * @see {@link shareReplay} |
| * |
| * @return A function that returns an Observable that mirrors the source. |
| */ |
| export function share<T>(options: ShareConfig<T> = {}): MonoTypeOperatorFunction<T> { |
| const { connector = () => new Subject<T>(), resetOnError = true, resetOnComplete = true, resetOnRefCountZero = true } = options; |
| // It's necessary to use a wrapper here, as the _operator_ must be |
| // referentially transparent. Otherwise, it cannot be used in calls to the |
| // static `pipe` function - to create a partial pipeline. |
| // |
| // The _operator function_ - the function returned by the _operator_ - will |
| // not be referentially transparent - as it shares its source - but the |
| // _operator function_ is called when the complete pipeline is composed via a |
| // call to a source observable's `pipe` method - not when the static `pipe` |
| // function is called. |
| return (wrapperSource) => { |
| let connection: SafeSubscriber<T> | undefined; |
| let resetConnection: Subscription | undefined; |
| let subject: SubjectLike<T> | undefined; |
| let refCount = 0; |
| let hasCompleted = false; |
| let hasErrored = false; |
| |
| const cancelReset = () => { |
| resetConnection?.unsubscribe(); |
| resetConnection = undefined; |
| }; |
| // Used to reset the internal state to a "cold" |
| // state, as though it had never been subscribed to. |
| const reset = () => { |
| cancelReset(); |
| connection = subject = undefined; |
| hasCompleted = hasErrored = false; |
| }; |
| const resetAndUnsubscribe = () => { |
| // We need to capture the connection before |
| // we reset (if we need to reset). |
| const conn = connection; |
| reset(); |
| conn?.unsubscribe(); |
| }; |
| |
| return operate<T, T>((source, subscriber) => { |
| refCount++; |
| if (!hasErrored && !hasCompleted) { |
| cancelReset(); |
| } |
| |
| // Create the subject if we don't have one yet. Grab a local reference to |
| // it as well, which avoids non-null assertions when using it and, if we |
| // connect to it now, then error/complete need a reference after it was |
| // reset. |
| const dest = (subject = subject ?? connector()); |
| |
| // Add the finalization directly to the subscriber - instead of returning it - |
| // so that the handling of the subscriber's unsubscription will be wired |
| // up _before_ the subscription to the source occurs. This is done so that |
| // the assignment to the source connection's `closed` property will be seen |
| // by synchronous firehose sources. |
| subscriber.add(() => { |
| refCount--; |
| |
| // If we're resetting on refCount === 0, and it's 0, we only want to do |
| // that on "unsubscribe", really. Resetting on error or completion is a different |
| // configuration. |
| if (refCount === 0 && !hasErrored && !hasCompleted) { |
| resetConnection = handleReset(resetAndUnsubscribe, resetOnRefCountZero); |
| } |
| }); |
| |
| // The following line adds the subscription to the subscriber passed. |
| // Basically, `subscriber === dest.subscribe(subscriber)` is `true`. |
| dest.subscribe(subscriber); |
| |
| if ( |
| !connection && |
| // Check this shareReplay is still activate - it can be reset to 0 |
| // and be "unsubscribed" _before_ it actually subscribes. |
| // If we were to subscribe then, it'd leak and get stuck. |
| refCount > 0 |
| ) { |
| // We need to create a subscriber here - rather than pass an observer and |
| // assign the returned subscription to connection - because it's possible |
| // for reentrant subscriptions to the shared observable to occur and in |
| // those situations we want connection to be already-assigned so that we |
| // don't create another connection to the source. |
| connection = new SafeSubscriber({ |
| next: (value) => dest.next(value), |
| error: (err) => { |
| hasErrored = true; |
| cancelReset(); |
| resetConnection = handleReset(reset, resetOnError, err); |
| dest.error(err); |
| }, |
| complete: () => { |
| hasCompleted = true; |
| cancelReset(); |
| resetConnection = handleReset(reset, resetOnComplete); |
| dest.complete(); |
| }, |
| }); |
| innerFrom(source).subscribe(connection); |
| } |
| })(wrapperSource); |
| }; |
| } |
| |
| function handleReset<T extends unknown[] = never[]>( |
| reset: () => void, |
| on: boolean | ((...args: T) => ObservableInput<any>), |
| ...args: T |
| ): Subscription | undefined { |
| if (on === true) { |
| reset(); |
| return; |
| } |
| |
| if (on === false) { |
| return; |
| } |
| |
| const onSubscriber = new SafeSubscriber({ |
| next: () => { |
| onSubscriber.unsubscribe(); |
| reset(); |
| }, |
| }); |
| |
| return innerFrom(on(...args)).subscribe(onSubscriber); |
| } |