| import { OperatorFunction, ObservableInput } from '../types'; |
| import { operate } from '../util/lift'; |
| import { noop } from '../util/noop'; |
| import { createOperatorSubscriber } from './OperatorSubscriber'; |
| import { innerFrom } from '../observable/innerFrom'; |
| |
| /** |
| * Buffers the source Observable values until `closingNotifier` emits. |
| * |
| * <span class="informal">Collects values from the past as an array, and emits |
| * that array only when another Observable emits.</span> |
| * |
| *  |
| * |
| * Buffers the incoming Observable values until the given `closingNotifier` |
| * `ObservableInput` (that internally gets converted to an Observable) |
| * emits a value, at which point it emits the buffer on the output |
| * Observable and starts a new buffer internally, awaiting the next time |
| * `closingNotifier` emits. |
| * |
| * ## Example |
| * |
| * On every click, emit array of most recent interval events |
| * |
| * ```ts |
| * import { fromEvent, interval, buffer } from 'rxjs'; |
| * |
| * const clicks = fromEvent(document, 'click'); |
| * const intervalEvents = interval(1000); |
| * const buffered = intervalEvents.pipe(buffer(clicks)); |
| * buffered.subscribe(x => console.log(x)); |
| * ``` |
| * |
| * @see {@link bufferCount} |
| * @see {@link bufferTime} |
| * @see {@link bufferToggle} |
| * @see {@link bufferWhen} |
| * @see {@link window} |
| * |
| * @param closingNotifier An `ObservableInput` that signals the |
| * buffer to be emitted on the output Observable. |
| * @return A function that returns an Observable of buffers, which are arrays |
| * of values. |
| */ |
| export function buffer<T>(closingNotifier: ObservableInput<any>): OperatorFunction<T, T[]> { |
| return operate((source, subscriber) => { |
| // The current buffered values. |
| let currentBuffer: T[] = []; |
| |
| // Subscribe to our source. |
| source.subscribe( |
| createOperatorSubscriber( |
| subscriber, |
| (value) => currentBuffer.push(value), |
| () => { |
| subscriber.next(currentBuffer); |
| subscriber.complete(); |
| } |
| ) |
| ); |
| |
| // Subscribe to the closing notifier. |
| innerFrom(closingNotifier).subscribe( |
| createOperatorSubscriber( |
| subscriber, |
| () => { |
| // Start a new buffer and emit the previous one. |
| const b = currentBuffer; |
| currentBuffer = []; |
| subscriber.next(b); |
| }, |
| noop |
| ) |
| ); |
| |
| return () => { |
| // Ensure buffered values are released on finalization. |
| currentBuffer = null!; |
| }; |
| }); |
| } |