| var Transform = require('readable-stream').Transform; |
| var inherits = require('inherits'); |
| var cyclist = require('cyclist'); |
| var util = require('util'); |
| |
| var ParallelTransform = function(maxParallel, opts, ontransform) { |
| if (!(this instanceof ParallelTransform)) return new ParallelTransform(maxParallel, opts, ontransform); |
| |
| if (typeof maxParallel === 'function') { |
| ontransform = maxParallel; |
| opts = null; |
| maxParallel = 1; |
| } |
| if (typeof opts === 'function') { |
| ontransform = opts; |
| opts = null; |
| } |
| |
| if (!opts) opts = {}; |
| if (!opts.highWaterMark) opts.highWaterMark = Math.max(maxParallel, 16); |
| if (opts.objectMode !== false) opts.objectMode = true; |
| |
| Transform.call(this, opts); |
| |
| this._maxParallel = maxParallel; |
| this._ontransform = ontransform; |
| this._destroyed = false; |
| this._flushed = false; |
| this._ordered = opts.ordered !== false; |
| this._buffer = this._ordered ? cyclist(maxParallel) : []; |
| this._top = 0; |
| this._bottom = 0; |
| this._ondrain = null; |
| }; |
| |
| inherits(ParallelTransform, Transform); |
| |
| ParallelTransform.prototype.destroy = function() { |
| if (this._destroyed) return; |
| this._destroyed = true; |
| this.emit('close'); |
| }; |
| |
| ParallelTransform.prototype._transform = function(chunk, enc, callback) { |
| var self = this; |
| var pos = this._top++; |
| |
| this._ontransform(chunk, function(err, data) { |
| if (self._destroyed) return; |
| if (err) { |
| self.emit('error', err); |
| self.push(null); |
| self.destroy(); |
| return; |
| } |
| if (self._ordered) { |
| self._buffer.put(pos, (data === undefined || data === null) ? null : data); |
| } |
| else { |
| self._buffer.push(data); |
| } |
| self._drain(); |
| }); |
| |
| if (this._top - this._bottom < this._maxParallel) return callback(); |
| this._ondrain = callback; |
| }; |
| |
| ParallelTransform.prototype._flush = function(callback) { |
| this._flushed = true; |
| this._ondrain = callback; |
| this._drain(); |
| }; |
| |
| ParallelTransform.prototype._drain = function() { |
| if (this._ordered) { |
| while (this._buffer.get(this._bottom) !== undefined) { |
| var data = this._buffer.del(this._bottom++); |
| if (data === null) continue; |
| this.push(data); |
| } |
| } |
| else { |
| while (this._buffer.length > 0) { |
| var data = this._buffer.pop(); |
| this._bottom++; |
| if (data === null) continue; |
| this.push(data); |
| } |
| } |
| |
| |
| if (!this._drained() || !this._ondrain) return; |
| |
| var ondrain = this._ondrain; |
| this._ondrain = null; |
| ondrain(); |
| }; |
| |
| ParallelTransform.prototype._drained = function() { |
| var diff = this._top - this._bottom; |
| return this._flushed ? !diff : diff < this._maxParallel; |
| }; |
| |
| module.exports = ParallelTransform; |