| var util = require('util'); |
| var Stream = require('stream').Stream; |
| var DelayedStream = require('delayed-stream'); |
| var defer = require('./defer.js'); |
| |
| module.exports = CombinedStream; |
| function CombinedStream() { |
| this.writable = false; |
| this.readable = true; |
| this.dataSize = 0; |
| this.maxDataSize = 2 * 1024 * 1024; |
| this.pauseStreams = true; |
| |
| this._released = false; |
| this._streams = []; |
| this._currentStream = null; |
| } |
| util.inherits(CombinedStream, Stream); |
| |
| CombinedStream.create = function(options) { |
| var combinedStream = new this(); |
| |
| options = options || {}; |
| for (var option in options) { |
| combinedStream[option] = options[option]; |
| } |
| |
| return combinedStream; |
| }; |
| |
| CombinedStream.isStreamLike = function(stream) { |
| return (typeof stream !== 'function') |
| && (typeof stream !== 'string') |
| && (typeof stream !== 'boolean') |
| && (typeof stream !== 'number') |
| && (!Buffer.isBuffer(stream)); |
| }; |
| |
| CombinedStream.prototype.append = function(stream) { |
| var isStreamLike = CombinedStream.isStreamLike(stream); |
| |
| if (isStreamLike) { |
| if (!(stream instanceof DelayedStream)) { |
| var newStream = DelayedStream.create(stream, { |
| maxDataSize: Infinity, |
| pauseStream: this.pauseStreams, |
| }); |
| stream.on('data', this._checkDataSize.bind(this)); |
| stream = newStream; |
| } |
| |
| this._handleErrors(stream); |
| |
| if (this.pauseStreams) { |
| stream.pause(); |
| } |
| } |
| |
| this._streams.push(stream); |
| return this; |
| }; |
| |
| CombinedStream.prototype.pipe = function(dest, options) { |
| Stream.prototype.pipe.call(this, dest, options); |
| this.resume(); |
| return dest; |
| }; |
| |
| CombinedStream.prototype._getNext = function() { |
| this._currentStream = null; |
| var stream = this._streams.shift(); |
| |
| |
| if (typeof stream == 'undefined') { |
| this.end(); |
| return; |
| } |
| |
| if (typeof stream !== 'function') { |
| this._pipeNext(stream); |
| return; |
| } |
| |
| var getStream = stream; |
| getStream(function(stream) { |
| var isStreamLike = CombinedStream.isStreamLike(stream); |
| if (isStreamLike) { |
| stream.on('data', this._checkDataSize.bind(this)); |
| this._handleErrors(stream); |
| } |
| |
| defer(this._pipeNext.bind(this, stream)); |
| }.bind(this)); |
| }; |
| |
| CombinedStream.prototype._pipeNext = function(stream) { |
| this._currentStream = stream; |
| |
| var isStreamLike = CombinedStream.isStreamLike(stream); |
| if (isStreamLike) { |
| stream.on('end', this._getNext.bind(this)); |
| stream.pipe(this, {end: false}); |
| return; |
| } |
| |
| var value = stream; |
| this.write(value); |
| this._getNext(); |
| }; |
| |
| CombinedStream.prototype._handleErrors = function(stream) { |
| var self = this; |
| stream.on('error', function(err) { |
| self._emitError(err); |
| }); |
| }; |
| |
| CombinedStream.prototype.write = function(data) { |
| this.emit('data', data); |
| }; |
| |
| CombinedStream.prototype.pause = function() { |
| if (!this.pauseStreams) { |
| return; |
| } |
| |
| if(this.pauseStreams && this._currentStream && typeof(this._currentStream.pause) == 'function') this._currentStream.pause(); |
| this.emit('pause'); |
| }; |
| |
| CombinedStream.prototype.resume = function() { |
| if (!this._released) { |
| this._released = true; |
| this.writable = true; |
| this._getNext(); |
| } |
| |
| if(this.pauseStreams && this._currentStream && typeof(this._currentStream.resume) == 'function') this._currentStream.resume(); |
| this.emit('resume'); |
| }; |
| |
| CombinedStream.prototype.end = function() { |
| this._reset(); |
| this.emit('end'); |
| }; |
| |
| CombinedStream.prototype.destroy = function() { |
| this._reset(); |
| this.emit('close'); |
| }; |
| |
| CombinedStream.prototype._reset = function() { |
| this.writable = false; |
| this._streams = []; |
| this._currentStream = null; |
| }; |
| |
| CombinedStream.prototype._checkDataSize = function() { |
| this._updateDataSize(); |
| if (this.dataSize <= this.maxDataSize) { |
| return; |
| } |
| |
| var message = |
| 'DelayedStream#maxDataSize of ' + this.maxDataSize + ' bytes exceeded.'; |
| this._emitError(new Error(message)); |
| }; |
| |
| CombinedStream.prototype._updateDataSize = function() { |
| this.dataSize = 0; |
| |
| var self = this; |
| this._streams.forEach(function(stream) { |
| if (!stream.dataSize) { |
| return; |
| } |
| |
| self.dataSize += stream.dataSize; |
| }); |
| |
| if (this._currentStream && this._currentStream.dataSize) { |
| this.dataSize += this._currentStream.dataSize; |
| } |
| }; |
| |
| CombinedStream.prototype._emitError = function(err) { |
| this._reset(); |
| this.emit('error', err); |
| }; |