| import { EMPTY } from '../observable/empty'; |
| import { MonoTypeOperatorFunction } from '../types'; |
| import { operate } from '../util/lift'; |
| import { createOperatorSubscriber } from './OperatorSubscriber'; |
| |
| /** |
| * Waits for the source to complete, then emits the last N values from the source, |
| * as specified by the `count` argument. |
| * |
| *  |
| * |
| * `takeLast` results in an observable that will hold values up to `count` values in memory, |
| * until the source completes. It then pushes all values in memory to the consumer, in the |
| * order they were received from the source, then notifies the consumer that it is |
| * complete. |
| * |
| * If for some reason the source completes before the `count` supplied to `takeLast` is reached, |
| * all values received until that point are emitted, and then completion is notified. |
| * |
| * **Warning**: Using `takeLast` with an observable that never completes will result |
| * in an observable that never emits a value. |
| * |
| * ## Example |
| * |
| * Take the last 3 values of an Observable with many values |
| * |
| * ```ts |
| * import { range, takeLast } from 'rxjs'; |
| * |
| * const many = range(1, 100); |
| * const lastThree = many.pipe(takeLast(3)); |
| * lastThree.subscribe(x => console.log(x)); |
| * ``` |
| * |
| * @see {@link take} |
| * @see {@link takeUntil} |
| * @see {@link takeWhile} |
| * @see {@link skip} |
| * |
| * @param count The maximum number of values to emit from the end of |
| * the sequence of values emitted by the source Observable. |
| * @return A function that returns an Observable that emits at most the last |
| * `count` values emitted by the source Observable. |
| */ |
| export function takeLast<T>(count: number): MonoTypeOperatorFunction<T> { |
| return count <= 0 |
| ? () => EMPTY |
| : operate((source, subscriber) => { |
| // This buffer will hold the values we are going to emit |
| // when the source completes. Since we only want to take the |
| // last N values, we can't emit until we're sure we're not getting |
| // any more values. |
| let buffer: T[] = []; |
| source.subscribe( |
| createOperatorSubscriber( |
| subscriber, |
| (value) => { |
| // Add the most recent value onto the end of our buffer. |
| buffer.push(value); |
| // If our buffer is now larger than the number of values we |
| // want to take, we remove the oldest value from the buffer. |
| count < buffer.length && buffer.shift(); |
| }, |
| () => { |
| // The source completed, we now know what are last values |
| // are, emit them in the order they were received. |
| for (const value of buffer) { |
| subscriber.next(value); |
| } |
| subscriber.complete(); |
| }, |
| // Errors are passed through to the consumer |
| undefined, |
| () => { |
| // During finalization release the values in our buffer. |
| buffer = null!; |
| } |
| ) |
| ); |
| }); |
| } |