| const Minipass = require('minipass') |
| const EE = require('events') |
| const isStream = s => s && s instanceof EE && ( |
| typeof s.pipe === 'function' || // readable |
| (typeof s.write === 'function' && typeof s.end === 'function') // writable |
| ) |
| |
| const _head = Symbol('_head') |
| const _tail = Symbol('_tail') |
| const _linkStreams = Symbol('_linkStreams') |
| const _setHead = Symbol('_setHead') |
| const _setTail = Symbol('_setTail') |
| const _onError = Symbol('_onError') |
| const _onData = Symbol('_onData') |
| const _onEnd = Symbol('_onEnd') |
| const _onDrain = Symbol('_onDrain') |
| const _streams = Symbol('_streams') |
| class Pipeline extends Minipass { |
| constructor (opts, ...streams) { |
| if (isStream(opts)) { |
| streams.unshift(opts) |
| opts = {} |
| } |
| |
| super(opts) |
| this[_streams] = [] |
| if (streams.length) |
| this.push(...streams) |
| } |
| |
| [_linkStreams] (streams) { |
| // reduce takes (left,right), and we return right to make it the |
| // new left value. |
| return streams.reduce((src, dest) => { |
| src.on('error', er => dest.emit('error', er)) |
| src.pipe(dest) |
| return dest |
| }) |
| } |
| |
| push (...streams) { |
| this[_streams].push(...streams) |
| if (this[_tail]) |
| streams.unshift(this[_tail]) |
| |
| const linkRet = this[_linkStreams](streams) |
| |
| this[_setTail](linkRet) |
| if (!this[_head]) |
| this[_setHead](streams[0]) |
| } |
| |
| unshift (...streams) { |
| this[_streams].unshift(...streams) |
| if (this[_head]) |
| streams.push(this[_head]) |
| |
| const linkRet = this[_linkStreams](streams) |
| this[_setHead](streams[0]) |
| if (!this[_tail]) |
| this[_setTail](linkRet) |
| } |
| |
| destroy (er) { |
| // set fire to the whole thing. |
| this[_streams].forEach(s => |
| typeof s.destroy === 'function' && s.destroy()) |
| return super.destroy(er) |
| } |
| |
| // readable interface -> tail |
| [_setTail] (stream) { |
| this[_tail] = stream |
| stream.on('error', er => this[_onError](stream, er)) |
| stream.on('data', chunk => this[_onData](stream, chunk)) |
| stream.on('end', () => this[_onEnd](stream)) |
| stream.on('finish', () => this[_onEnd](stream)) |
| } |
| |
| // errors proxied down the pipeline |
| // they're considered part of the "read" interface |
| [_onError] (stream, er) { |
| if (stream === this[_tail]) |
| this.emit('error', er) |
| } |
| [_onData] (stream, chunk) { |
| if (stream === this[_tail]) |
| super.write(chunk) |
| } |
| [_onEnd] (stream) { |
| if (stream === this[_tail]) |
| super.end() |
| } |
| pause () { |
| super.pause() |
| return this[_tail] && this[_tail].pause && this[_tail].pause() |
| } |
| |
| // NB: Minipass calls its internal private [RESUME] method during |
| // pipe drains, to avoid hazards where stream.resume() is overridden. |
| // Thus, we need to listen to the resume *event*, not override the |
| // resume() method, and proxy *that* to the tail. |
| emit (ev, ...args) { |
| if (ev === 'resume' && this[_tail] && this[_tail].resume) |
| this[_tail].resume() |
| return super.emit(ev, ...args) |
| } |
| |
| // writable interface -> head |
| [_setHead] (stream) { |
| this[_head] = stream |
| stream.on('drain', () => this[_onDrain](stream)) |
| } |
| [_onDrain] (stream) { |
| if (stream === this[_head]) |
| this.emit('drain') |
| } |
| write (chunk, enc, cb) { |
| return this[_head].write(chunk, enc, cb) && |
| (this.flowing || this.buffer.length === 0) |
| } |
| end (chunk, enc, cb) { |
| this[_head].end(chunk, enc, cb) |
| return this |
| } |
| } |
| |
| module.exports = Pipeline |