| import { Observable } from '../Observable'; |
| import { innerFrom } from '../observable/innerFrom'; |
| import { Subject } from '../Subject'; |
| import { operate } from '../util/lift'; |
| import { createOperatorSubscriber, OperatorSubscriber } from './OperatorSubscriber'; |
| export function groupBy(keySelector, elementOrOptions, duration, connector) { |
| return operate(function (source, subscriber) { |
| var element; |
| if (!elementOrOptions || typeof elementOrOptions === 'function') { |
| element = elementOrOptions; |
| } |
| else { |
| (duration = elementOrOptions.duration, element = elementOrOptions.element, connector = elementOrOptions.connector); |
| } |
| var groups = new Map(); |
| var notify = function (cb) { |
| groups.forEach(cb); |
| cb(subscriber); |
| }; |
| var handleError = function (err) { return notify(function (consumer) { return consumer.error(err); }); }; |
| var activeGroups = 0; |
| var teardownAttempted = false; |
| var groupBySourceSubscriber = new OperatorSubscriber(subscriber, function (value) { |
| try { |
| var key_1 = keySelector(value); |
| var group_1 = groups.get(key_1); |
| if (!group_1) { |
| groups.set(key_1, (group_1 = connector ? connector() : new Subject())); |
| var grouped = createGroupedObservable(key_1, group_1); |
| subscriber.next(grouped); |
| if (duration) { |
| var durationSubscriber_1 = createOperatorSubscriber(group_1, function () { |
| group_1.complete(); |
| durationSubscriber_1 === null || durationSubscriber_1 === void 0 ? void 0 : durationSubscriber_1.unsubscribe(); |
| }, undefined, undefined, function () { return groups.delete(key_1); }); |
| groupBySourceSubscriber.add(innerFrom(duration(grouped)).subscribe(durationSubscriber_1)); |
| } |
| } |
| group_1.next(element ? element(value) : value); |
| } |
| catch (err) { |
| handleError(err); |
| } |
| }, function () { return notify(function (consumer) { return consumer.complete(); }); }, handleError, function () { return groups.clear(); }, function () { |
| teardownAttempted = true; |
| return activeGroups === 0; |
| }); |
| source.subscribe(groupBySourceSubscriber); |
| function createGroupedObservable(key, groupSubject) { |
| var result = new Observable(function (groupSubscriber) { |
| activeGroups++; |
| var innerSub = groupSubject.subscribe(groupSubscriber); |
| return function () { |
| innerSub.unsubscribe(); |
| --activeGroups === 0 && teardownAttempted && groupBySourceSubscriber.unsubscribe(); |
| }; |
| }); |
| result.key = key; |
| return result; |
| } |
| }); |
| } |
| //# sourceMappingURL=groupBy.js.map |