| import { OperatorFunction, ObservableInput } from '../types'; |
| import { operate } from '../util/lift'; |
| import { createOperatorSubscriber } from './OperatorSubscriber'; |
| import { innerFrom } from '../observable/innerFrom'; |
| |
| /** |
| * Compares all values of two observables in sequence using an optional comparator function |
| * and returns an observable of a single boolean value representing whether or not the two sequences |
| * are equal. |
| * |
| * <span class="informal">Checks to see of all values emitted by both observables are equal, in order.</span> |
| * |
| *  |
| * |
| * `sequenceEqual` subscribes to source observable and `compareTo` `ObservableInput` (that internally |
| * gets converted to an observable) and buffers incoming values from each observable. Whenever either |
| * observable emits a value, the value is buffered and the buffers are shifted and compared from the bottom |
| * up; If any value pair doesn't match, the returned observable will emit `false` and complete. If one of the |
| * observables completes, the operator will wait for the other observable to complete; If the other |
| * observable emits before completing, the returned observable will emit `false` and complete. If one observable never |
| * completes or emits after the other completes, the returned observable will never complete. |
| * |
| * ## Example |
| * |
| * Figure out if the Konami code matches |
| * |
| * ```ts |
| * import { from, fromEvent, map, bufferCount, mergeMap, sequenceEqual } from 'rxjs'; |
| * |
| * const codes = from([ |
| * 'ArrowUp', |
| * 'ArrowUp', |
| * 'ArrowDown', |
| * 'ArrowDown', |
| * 'ArrowLeft', |
| * 'ArrowRight', |
| * 'ArrowLeft', |
| * 'ArrowRight', |
| * 'KeyB', |
| * 'KeyA', |
| * 'Enter', // no start key, clearly. |
| * ]); |
| * |
| * const keys = fromEvent<KeyboardEvent>(document, 'keyup').pipe(map(e => e.code)); |
| * const matches = keys.pipe( |
| * bufferCount(11, 1), |
| * mergeMap(last11 => from(last11).pipe(sequenceEqual(codes))) |
| * ); |
| * matches.subscribe(matched => console.log('Successful cheat at Contra? ', matched)); |
| * ``` |
| * |
| * @see {@link combineLatest} |
| * @see {@link zip} |
| * @see {@link withLatestFrom} |
| * |
| * @param compareTo The `ObservableInput` sequence to compare the source sequence to. |
| * @param comparator An optional function to compare each value pair. |
| * |
| * @return A function that returns an Observable that emits a single boolean |
| * value representing whether or not the values emitted by the source |
| * Observable and provided `ObservableInput` were equal in sequence. |
| */ |
| export function sequenceEqual<T>( |
| compareTo: ObservableInput<T>, |
| comparator: (a: T, b: T) => boolean = (a, b) => a === b |
| ): OperatorFunction<T, boolean> { |
| return operate((source, subscriber) => { |
| // The state for the source observable |
| const aState = createState<T>(); |
| // The state for the compareTo observable; |
| const bState = createState<T>(); |
| |
| /** A utility to emit and complete */ |
| const emit = (isEqual: boolean) => { |
| subscriber.next(isEqual); |
| subscriber.complete(); |
| }; |
| |
| /** |
| * Creates a subscriber that subscribes to one of the sources, and compares its collected |
| * state -- `selfState` -- to the other source's collected state -- `otherState`. This |
| * is used for both streams. |
| */ |
| const createSubscriber = (selfState: SequenceState<T>, otherState: SequenceState<T>) => { |
| const sequenceEqualSubscriber = createOperatorSubscriber( |
| subscriber, |
| (a: T) => { |
| const { buffer, complete } = otherState; |
| if (buffer.length === 0) { |
| // If there's no values in the other buffer |
| // and the other stream is complete, we know |
| // this isn't a match, because we got one more value. |
| // Otherwise, we push onto our buffer, so when the other |
| // stream emits, it can pull this value off our buffer and check it |
| // at the appropriate time. |
| complete ? emit(false) : selfState.buffer.push(a); |
| } else { |
| // If the other stream *does* have values in its buffer, |
| // pull the oldest one off so we can compare it to what we |
| // just got. If it wasn't a match, emit `false` and complete. |
| !comparator(a, buffer.shift()!) && emit(false); |
| } |
| }, |
| () => { |
| // Or observable completed |
| selfState.complete = true; |
| const { complete, buffer } = otherState; |
| // If the other observable is also complete, and there's |
| // still stuff left in their buffer, it doesn't match, if their |
| // buffer is empty, then it does match. This is because we can't |
| // possibly get more values here anymore. |
| complete && emit(buffer.length === 0); |
| // Be sure to clean up our stream as soon as possible if we can. |
| sequenceEqualSubscriber?.unsubscribe(); |
| } |
| ); |
| |
| return sequenceEqualSubscriber; |
| }; |
| |
| // Subscribe to each source. |
| source.subscribe(createSubscriber(aState, bState)); |
| innerFrom(compareTo).subscribe(createSubscriber(bState, aState)); |
| }); |
| } |
| |
| /** |
| * A simple structure for the data used to test each sequence |
| */ |
| interface SequenceState<T> { |
| /** A temporary store for arrived values before they are checked */ |
| buffer: T[]; |
| /** Whether or not the sequence source has completed. */ |
| complete: boolean; |
| } |
| |
| /** |
| * Creates a simple structure that is used to represent |
| * data used to test each sequence. |
| */ |
| function createState<T>(): SequenceState<T> { |
| return { |
| buffer: [], |
| complete: false, |
| }; |
| } |