blob: ece67345b14a43524e685480ae5ca2f4c66b144e [file] [log] [blame]
'use strict'
/*
* merge2
* https://github.com/teambition/merge2
*
* Copyright (c) 2014-2016 Teambition
* Licensed under the MIT license.
*/
import Stream from 'stream'
const PassThrough = Stream.PassThrough
const slice = Array.prototype.slice
function merge2 () {
const streamsQueue = []
let merging = false
let args = slice.call(arguments)
let options = args[args.length - 1]
if (options && !Array.isArray(options) && options.pipe == null) args.pop()
else options = {}
let doEnd = options.end !== false
if (options.objectMode == null) options.objectMode = true
if (options.highWaterMark == null) options.highWaterMark = 64 * 1024
const mergedStream = PassThrough(options)
function addStream () {
for (let i = 0, len = arguments.length; i < len; i++) {
streamsQueue.push(pauseStreams(arguments[i], options))
}
mergeStream()
return this
}
function mergeStream () {
if (merging) return
merging = true
let streams = streamsQueue.shift()
if (!streams) {
process.nextTick(endStream)
return
}
if (!Array.isArray(streams)) streams = [streams]
let pipesCount = streams.length + 1
function next () {
if (--pipesCount > 0) return
merging = false
mergeStream()
}
function pipe (stream) {
function onend () {
stream.removeListener('merge2UnpipeEnd', onend)
stream.removeListener('end', onend)
next()
}
// skip ended stream
if (stream._readableState.endEmitted) return next()
stream.on('merge2UnpipeEnd', onend)
stream.on('end', onend)
stream.pipe(mergedStream, { end: false })
// compatible for old stream
stream.resume()
}
for (let i = 0; i < streams.length; i++) pipe(streams[i])
next()
}
function endStream () {
merging = false
// emit 'queueDrain' when all streams merged.
mergedStream.emit('queueDrain')
return doEnd && mergedStream.end()
}
mergedStream.setMaxListeners(0)
mergedStream.add = addStream
mergedStream.on('unpipe', function (stream) {
stream.emit('merge2UnpipeEnd')
})
if (args.length) addStream.apply(null, args)
return mergedStream
}
// check and pause streams for pipe.
function pauseStreams (streams, options) {
if (!Array.isArray(streams)) {
// Backwards-compat with old-style streams
if (!streams._readableState && streams.pipe) streams = streams.pipe(PassThrough(options))
if (!streams._readableState || !streams.pause || !streams.pipe) {
throw new Error('Only readable stream can be merged.')
}
streams.pause()
} else {
for (let i = 0, len = streams.length; i < len; i++) streams[i] = pauseStreams(streams[i], options)
}
return streams
}
export default merge2
export { merge2 }