| 'use strict'; |
| const {Transform} = require('stream'); |
| |
| class ObjectTransform extends Transform { |
| constructor() { |
| super({ |
| objectMode: true |
| }); |
| } |
| } |
| |
| class FilterStream extends ObjectTransform { |
| constructor(filter) { |
| super(); |
| this._filter = filter; |
| } |
| |
| _transform(data, encoding, callback) { |
| if (this._filter(data)) { |
| this.push(data); |
| } |
| |
| callback(); |
| } |
| } |
| |
| class UniqueStream extends ObjectTransform { |
| constructor() { |
| super(); |
| this._pushed = new Set(); |
| } |
| |
| _transform(data, encoding, callback) { |
| if (!this._pushed.has(data)) { |
| this.push(data); |
| this._pushed.add(data); |
| } |
| |
| callback(); |
| } |
| } |
| |
| module.exports = { |
| FilterStream, |
| UniqueStream |
| }; |