| var Readable = require('readable-stream').Readable |
| var inherits = require('inherits') |
| |
| module.exports = from2 |
| |
| from2.ctor = ctor |
| from2.obj = obj |
| |
| var Proto = ctor() |
| |
| function toFunction(list) { |
| list = list.slice() |
| return function (_, cb) { |
| var err = null |
| var item = list.length ? list.shift() : null |
| if (item instanceof Error) { |
| err = item |
| item = null |
| } |
| |
| cb(err, item) |
| } |
| } |
| |
| function from2(opts, read) { |
| if (typeof opts !== 'object' || Array.isArray(opts)) { |
| read = opts |
| opts = {} |
| } |
| |
| var rs = new Proto(opts) |
| rs._from = Array.isArray(read) ? toFunction(read) : (read || noop) |
| return rs |
| } |
| |
| function ctor(opts, read) { |
| if (typeof opts === 'function') { |
| read = opts |
| opts = {} |
| } |
| |
| opts = defaults(opts) |
| |
| inherits(Class, Readable) |
| function Class(override) { |
| if (!(this instanceof Class)) return new Class(override) |
| this._reading = false |
| this._callback = check |
| this.destroyed = false |
| Readable.call(this, override || opts) |
| |
| var self = this |
| var hwm = this._readableState.highWaterMark |
| |
| function check(err, data) { |
| if (self.destroyed) return |
| if (err) return self.destroy(err) |
| if (data === null) return self.push(null) |
| self._reading = false |
| if (self.push(data)) self._read(hwm) |
| } |
| } |
| |
| Class.prototype._from = read || noop |
| Class.prototype._read = function(size) { |
| if (this._reading || this.destroyed) return |
| this._reading = true |
| this._from(size, this._callback) |
| } |
| |
| Class.prototype.destroy = function(err) { |
| if (this.destroyed) return |
| this.destroyed = true |
| |
| var self = this |
| process.nextTick(function() { |
| if (err) self.emit('error', err) |
| self.emit('close') |
| }) |
| } |
| |
| return Class |
| } |
| |
| function obj(opts, read) { |
| if (typeof opts === 'function' || Array.isArray(opts)) { |
| read = opts |
| opts = {} |
| } |
| |
| opts = defaults(opts) |
| opts.objectMode = true |
| opts.highWaterMark = 16 |
| |
| return from2(opts, read) |
| } |
| |
| function noop () {} |
| |
| function defaults(opts) { |
| opts = opts || {} |
| return opts |
| } |