| var Duplex = require('readable-stream').Duplex; |
| var PassThrough = require('readable-stream').PassThrough; |
| var Readable = require('readable-stream').Readable; |
| var inherits = require('inherits'); |
| |
| var nextTick = typeof setImmediate !== 'undefined' |
| ? setImmediate : process.nextTick |
| ; |
| |
| module.exports = Pipeline; |
| inherits(Pipeline, Duplex); |
| |
| module.exports.obj = function (streams, opts) { |
| if (!opts && !Array.isArray(streams)) { |
| opts = streams; |
| streams = []; |
| } |
| if (!streams) streams = []; |
| if (!opts) opts = {}; |
| opts.objectMode = true; |
| return new Pipeline(streams, opts); |
| }; |
| |
| function Pipeline (streams, opts) { |
| if (!(this instanceof Pipeline)) return new Pipeline(streams, opts); |
| if (!opts && !Array.isArray(streams)) { |
| opts = streams; |
| streams = []; |
| } |
| if (!streams) streams = []; |
| if (!opts) opts = {}; |
| Duplex.call(this, opts); |
| |
| var self = this; |
| this._options = opts; |
| this._wrapOptions = { objectMode: opts.objectMode !== false }; |
| this._streams = []; |
| |
| this.splice.apply(this, [ 0, 0 ].concat(streams)); |
| |
| this.once('finish', function () { |
| self._notEmpty(); |
| self._streams[0].end(); |
| }); |
| } |
| |
| Pipeline.prototype._read = function () { |
| var self = this; |
| this._notEmpty(); |
| |
| var r = this._streams[this._streams.length-1]; |
| var buf, reads = 0; |
| while ((buf = r.read()) !== null) { |
| Duplex.prototype.push.call(this, buf); |
| reads ++; |
| } |
| if (reads === 0) { |
| var onreadable = function () { |
| r.removeListener('readable', onreadable); |
| self.removeListener('_mutate', onreadable); |
| self._read() |
| }; |
| r.once('readable', onreadable); |
| self.once('_mutate', onreadable); |
| } |
| }; |
| |
| Pipeline.prototype._write = function (buf, enc, next) { |
| this._notEmpty(); |
| this._streams[0]._write(buf, enc, next); |
| }; |
| |
| Pipeline.prototype._notEmpty = function () { |
| var self = this; |
| if (this._streams.length > 0) return; |
| var stream = new PassThrough(this._options); |
| stream.once('end', function () { |
| var ix = self._streams.indexOf(stream); |
| if (ix >= 0 && ix === self._streams.length - 1) { |
| Duplex.prototype.push.call(self, null); |
| } |
| }); |
| this._streams.push(stream); |
| this.length = this._streams.length; |
| }; |
| |
| Pipeline.prototype.push = function (stream) { |
| var args = [ this._streams.length, 0 ].concat([].slice.call(arguments)); |
| this.splice.apply(this, args); |
| return this._streams.length; |
| }; |
| |
| Pipeline.prototype.pop = function () { |
| return this.splice(this._streams.length-1,1)[0]; |
| }; |
| |
| Pipeline.prototype.shift = function () { |
| return this.splice(0,1)[0]; |
| }; |
| |
| Pipeline.prototype.unshift = function () { |
| this.splice.apply(this, [0,0].concat([].slice.call(arguments))); |
| return this._streams.length; |
| }; |
| |
| Pipeline.prototype.splice = function (start, removeLen) { |
| var self = this; |
| var len = this._streams.length; |
| start = start < 0 ? len - start : start; |
| if (removeLen === undefined) removeLen = len - start; |
| removeLen = Math.max(0, Math.min(len - start, removeLen)); |
| |
| for (var i = start; i < start + removeLen; i++) { |
| if (self._streams[i-1]) { |
| self._streams[i-1].unpipe(self._streams[i]); |
| } |
| } |
| if (self._streams[i-1] && self._streams[i]) { |
| self._streams[i-1].unpipe(self._streams[i]); |
| } |
| var end = i; |
| |
| var reps = [], args = arguments; |
| for (var j = 2; j < args.length; j++) (function (stream) { |
| if (Array.isArray(stream)) { |
| stream = new Pipeline(stream, self._options); |
| } |
| stream.on('error', function (err) { |
| err.stream = this; |
| self.emit('error', err); |
| }); |
| stream = self._wrapStream(stream); |
| stream.once('end', function () { |
| var ix = self._streams.indexOf(stream); |
| if (ix >= 0 && ix === self._streams.length - 1) { |
| Duplex.prototype.push.call(self, null); |
| } |
| }); |
| reps.push(stream); |
| })(arguments[j]); |
| |
| for (var i = 0; i < reps.length - 1; i++) { |
| reps[i].pipe(reps[i+1]); |
| } |
| |
| if (reps.length && self._streams[end]) { |
| reps[reps.length-1].pipe(self._streams[end]); |
| } |
| if (reps[0] && self._streams[start-1]) { |
| self._streams[start-1].pipe(reps[0]); |
| } |
| |
| var sargs = [start,removeLen].concat(reps); |
| var removed = self._streams.splice.apply(self._streams, sargs); |
| |
| for (var i = 0; i < reps.length; i++) { |
| reps[i].read(0); |
| } |
| |
| this.emit('_mutate'); |
| this.length = this._streams.length; |
| return removed; |
| }; |
| |
| Pipeline.prototype.get = function () { |
| if (arguments.length === 0) return undefined; |
| |
| var base = this; |
| for (var i = 0; i < arguments.length; i++) { |
| var index = arguments[i]; |
| if (index < 0) { |
| base = base._streams[base._streams.length + index]; |
| } |
| else { |
| base = base._streams[index]; |
| } |
| if (!base) return undefined; |
| } |
| return base; |
| }; |
| |
| Pipeline.prototype.indexOf = function (stream) { |
| return this._streams.indexOf(stream); |
| }; |
| |
| Pipeline.prototype._wrapStream = function (stream) { |
| if (typeof stream.read === 'function') return stream; |
| var w = new Readable(this._wrapOptions).wrap(stream); |
| w._write = function (buf, enc, next) { |
| if (stream.write(buf) === false) { |
| stream.once('drain', next); |
| } |
| else nextTick(next); |
| }; |
| return w; |
| }; |