| import { Subscription } from '../Subscription'; |
| import { OperatorFunction, SchedulerLike } from '../types'; |
| import { operate } from '../util/lift'; |
| import { createOperatorSubscriber } from './OperatorSubscriber'; |
| import { arrRemove } from '../util/arrRemove'; |
| import { asyncScheduler } from '../scheduler/async'; |
| import { popScheduler } from '../util/args'; |
| import { executeSchedule } from '../util/executeSchedule'; |
| |
| /* tslint:disable:max-line-length */ |
| export function bufferTime<T>(bufferTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction<T, T[]>; |
| export function bufferTime<T>( |
| bufferTimeSpan: number, |
| bufferCreationInterval: number | null | undefined, |
| scheduler?: SchedulerLike |
| ): OperatorFunction<T, T[]>; |
| export function bufferTime<T>( |
| bufferTimeSpan: number, |
| bufferCreationInterval: number | null | undefined, |
| maxBufferSize: number, |
| scheduler?: SchedulerLike |
| ): OperatorFunction<T, T[]>; |
| /* tslint:enable:max-line-length */ |
| |
| /** |
| * Buffers the source Observable values for a specific time period. |
| * |
| * <span class="informal">Collects values from the past as an array, and emits |
| * those arrays periodically in time.</span> |
| * |
| *  |
| * |
| * Buffers values from the source for a specific time duration `bufferTimeSpan`. |
| * Unless the optional argument `bufferCreationInterval` is given, it emits and |
| * resets the buffer every `bufferTimeSpan` milliseconds. If |
| * `bufferCreationInterval` is given, this operator opens the buffer every |
| * `bufferCreationInterval` milliseconds and closes (emits and resets) the |
| * buffer every `bufferTimeSpan` milliseconds. When the optional argument |
| * `maxBufferSize` is specified, the buffer will be closed either after |
| * `bufferTimeSpan` milliseconds or when it contains `maxBufferSize` elements. |
| * |
| * ## Examples |
| * |
| * Every second, emit an array of the recent click events |
| * |
| * ```ts |
| * import { fromEvent, bufferTime } from 'rxjs'; |
| * |
| * const clicks = fromEvent(document, 'click'); |
| * const buffered = clicks.pipe(bufferTime(1000)); |
| * buffered.subscribe(x => console.log(x)); |
| * ``` |
| * |
| * Every 5 seconds, emit the click events from the next 2 seconds |
| * |
| * ```ts |
| * import { fromEvent, bufferTime } from 'rxjs'; |
| * |
| * const clicks = fromEvent(document, 'click'); |
| * const buffered = clicks.pipe(bufferTime(2000, 5000)); |
| * buffered.subscribe(x => console.log(x)); |
| * ``` |
| * |
| * @see {@link buffer} |
| * @see {@link bufferCount} |
| * @see {@link bufferToggle} |
| * @see {@link bufferWhen} |
| * @see {@link windowTime} |
| * |
| * @param {number} bufferTimeSpan The amount of time to fill each buffer array. |
| * @param {number} [bufferCreationInterval] The interval at which to start new |
| * buffers. |
| * @param {number} [maxBufferSize] The maximum buffer size. |
| * @param {SchedulerLike} [scheduler=async] The scheduler on which to schedule the |
| * intervals that determine buffer boundaries. |
| * @return A function that returns an Observable of arrays of buffered values. |
| */ |
| export function bufferTime<T>(bufferTimeSpan: number, ...otherArgs: any[]): OperatorFunction<T, T[]> { |
| const scheduler = popScheduler(otherArgs) ?? asyncScheduler; |
| const bufferCreationInterval = (otherArgs[0] as number) ?? null; |
| const maxBufferSize = (otherArgs[1] as number) || Infinity; |
| |
| return operate((source, subscriber) => { |
| // The active buffers, their related subscriptions, and removal functions. |
| let bufferRecords: { buffer: T[]; subs: Subscription }[] | null = []; |
| // If true, it means that every time we emit a buffer, we want to start a new buffer |
| // this is only really used for when *just* the buffer time span is passed. |
| let restartOnEmit = false; |
| |
| /** |
| * Does the work of emitting the buffer from the record, ensuring that the |
| * record is removed before the emission so reentrant code (from some custom scheduling, perhaps) |
| * does not alter the buffer. Also checks to see if a new buffer needs to be started |
| * after the emit. |
| */ |
| const emit = (record: { buffer: T[]; subs: Subscription }) => { |
| const { buffer, subs } = record; |
| subs.unsubscribe(); |
| arrRemove(bufferRecords, record); |
| subscriber.next(buffer); |
| restartOnEmit && startBuffer(); |
| }; |
| |
| /** |
| * Called every time we start a new buffer. This does |
| * the work of scheduling a job at the requested bufferTimeSpan |
| * that will emit the buffer (if it's not unsubscribed before then). |
| */ |
| const startBuffer = () => { |
| if (bufferRecords) { |
| const subs = new Subscription(); |
| subscriber.add(subs); |
| const buffer: T[] = []; |
| const record = { |
| buffer, |
| subs, |
| }; |
| bufferRecords.push(record); |
| executeSchedule(subs, scheduler, () => emit(record), bufferTimeSpan); |
| } |
| }; |
| |
| if (bufferCreationInterval !== null && bufferCreationInterval >= 0) { |
| // The user passed both a bufferTimeSpan (required), and a creation interval |
| // That means we need to start new buffers on the interval, and those buffers need |
| // to wait the required time span before emitting. |
| executeSchedule(subscriber, scheduler, startBuffer, bufferCreationInterval, true); |
| } else { |
| restartOnEmit = true; |
| } |
| |
| startBuffer(); |
| |
| const bufferTimeSubscriber = createOperatorSubscriber( |
| subscriber, |
| (value: T) => { |
| // Copy the records, so if we need to remove one we |
| // don't mutate the array. It's hard, but not impossible to |
| // set up a buffer time that could mutate the array and |
| // cause issues here. |
| const recordsCopy = bufferRecords!.slice(); |
| for (const record of recordsCopy) { |
| // Loop over all buffers and |
| const { buffer } = record; |
| buffer.push(value); |
| // If the buffer is over the max size, we need to emit it. |
| maxBufferSize <= buffer.length && emit(record); |
| } |
| }, |
| () => { |
| // The source completed, emit all of the active |
| // buffers we have before we complete. |
| while (bufferRecords?.length) { |
| subscriber.next(bufferRecords.shift()!.buffer); |
| } |
| bufferTimeSubscriber?.unsubscribe(); |
| subscriber.complete(); |
| subscriber.unsubscribe(); |
| }, |
| // Pass all errors through to consumer. |
| undefined, |
| // Clean up |
| () => (bufferRecords = null) |
| ); |
| |
| source.subscribe(bufferTimeSubscriber); |
| }); |
| } |