| import { MonoTypeOperatorFunction } from '../types'; |
| import { identity } from '../util/identity'; |
| import { operate } from '../util/lift'; |
| import { createOperatorSubscriber } from './OperatorSubscriber'; |
| |
| /** |
| * Skip a specified number of values before the completion of an observable. |
| * |
| *  |
| * |
| * Returns an observable that will emit values as soon as it can, given a number of |
| * skipped values. For example, if you `skipLast(3)` on a source, when the source |
| * emits its fourth value, the first value the source emitted will finally be emitted |
| * from the returned observable, as it is no longer part of what needs to be skipped. |
| * |
| * All values emitted by the result of `skipLast(N)` will be delayed by `N` emissions, |
| * as each value is held in a buffer until enough values have been emitted that that |
| * the buffered value may finally be sent to the consumer. |
| * |
| * After subscribing, unsubscribing will not result in the emission of the buffered |
| * skipped values. |
| * |
| * ## Example |
| * |
| * Skip the last 2 values of an observable with many values |
| * |
| * ```ts |
| * import { of, skipLast } from 'rxjs'; |
| * |
| * const numbers = of(1, 2, 3, 4, 5); |
| * const skipLastTwo = numbers.pipe(skipLast(2)); |
| * skipLastTwo.subscribe(x => console.log(x)); |
| * |
| * // Results in: |
| * // 1 2 3 |
| * // (4 and 5 are skipped) |
| * ``` |
| * |
| * @see {@link skip} |
| * @see {@link skipUntil} |
| * @see {@link skipWhile} |
| * @see {@link take} |
| * |
| * @param skipCount Number of elements to skip from the end of the source Observable. |
| * @return A function that returns an Observable that skips the last `count` |
| * values emitted by the source Observable. |
| */ |
| export function skipLast<T>(skipCount: number): MonoTypeOperatorFunction<T> { |
| return skipCount <= 0 |
| ? // For skipCounts less than or equal to zero, we are just mirroring the source. |
| identity |
| : operate((source, subscriber) => { |
| // A ring buffer to hold the values while we wait to see |
| // if we can emit it or it's part of the "skipped" last values. |
| // Note that it is the _same size_ as the skip count. |
| let ring: T[] = new Array(skipCount); |
| // The number of values seen so far. This is used to get |
| // the index of the current value when it arrives. |
| let seen = 0; |
| source.subscribe( |
| createOperatorSubscriber(subscriber, (value) => { |
| // Get the index of the value we have right now |
| // relative to all other values we've seen, then |
| // increment `seen`. This ensures we've moved to |
| // the next slot in our ring buffer. |
| const valueIndex = seen++; |
| if (valueIndex < skipCount) { |
| // If we haven't seen enough values to fill our buffer yet, |
| // Then we aren't to a number of seen values where we can |
| // emit anything, so let's just start by filling the ring buffer. |
| ring[valueIndex] = value; |
| } else { |
| // We are traversing over the ring array in such |
| // a way that when we get to the end, we loop back |
| // and go to the start. |
| const index = valueIndex % skipCount; |
| // Pull the oldest value out so we can emit it, |
| // and stuff the new value in it's place. |
| const oldValue = ring[index]; |
| ring[index] = value; |
| // Emit the old value. It is important that this happens |
| // after we swap the value in the buffer, if it happens |
| // before we swap the value in the buffer, then a synchronous |
| // source can get the buffer out of whack. |
| subscriber.next(oldValue); |
| } |
| }) |
| ); |
| |
| return () => { |
| // Release our values in memory |
| ring = null!; |
| }; |
| }); |
| } |