| import { Connectable, ObservableInput, SubjectLike } from '../types'; |
| import { Subject } from '../Subject'; |
| import { Subscription } from '../Subscription'; |
| import { Observable } from '../Observable'; |
| import { defer } from './defer'; |
| |
| export interface ConnectableConfig<T> { |
| /** |
| * A factory function used to create the Subject through which the source |
| * is multicast. By default this creates a {@link Subject}. |
| */ |
| connector: () => SubjectLike<T>; |
| /** |
| * If true, the resulting observable will reset internal state upon disconnection |
| * and return to a "cold" state. This allows the resulting observable to be |
| * reconnected. |
| * If false, upon disconnection, the connecting subject will remain the |
| * connecting subject, meaning the resulting observable will not go "cold" again, |
| * and subsequent repeats or resubscriptions will resubscribe to that same subject. |
| */ |
| resetOnDisconnect?: boolean; |
| } |
| |
| /** |
| * The default configuration for `connectable`. |
| */ |
| const DEFAULT_CONFIG: ConnectableConfig<unknown> = { |
| connector: () => new Subject<unknown>(), |
| resetOnDisconnect: true, |
| }; |
| |
| /** |
| * Creates an observable that multicasts once `connect()` is called on it. |
| * |
| * @param source The observable source to make connectable. |
| * @param config The configuration object for `connectable`. |
| * @returns A "connectable" observable, that has a `connect()` method, that you must call to |
| * connect the source to all consumers through the subject provided as the connector. |
| */ |
| export function connectable<T>(source: ObservableInput<T>, config: ConnectableConfig<T> = DEFAULT_CONFIG): Connectable<T> { |
| // The subscription representing the connection. |
| let connection: Subscription | null = null; |
| const { connector, resetOnDisconnect = true } = config; |
| let subject = connector(); |
| |
| const result: any = new Observable<T>((subscriber) => { |
| return subject.subscribe(subscriber); |
| }); |
| |
| // Define the `connect` function. This is what users must call |
| // in order to "connect" the source to the subject that is |
| // multicasting it. |
| result.connect = () => { |
| if (!connection || connection.closed) { |
| connection = defer(() => source).subscribe(subject); |
| if (resetOnDisconnect) { |
| connection.add(() => (subject = connector())); |
| } |
| } |
| return connection; |
| }; |
| |
| return result; |
| } |