| module.exports = collect |
| |
| function collect (stream) { |
| if (stream._collected) return |
| |
| if (stream._paused) return stream.on('resume', collect.bind(null, stream)) |
| |
| stream._collected = true |
| stream.pause() |
| |
| stream.on('data', save) |
| stream.on('end', save) |
| var buf = [] |
| function save (b) { |
| if (typeof b === 'string') b = new Buffer(b) |
| if (Buffer.isBuffer(b) && !b.length) return |
| buf.push(b) |
| } |
| |
| stream.on('entry', saveEntry) |
| var entryBuffer = [] |
| function saveEntry (e) { |
| collect(e) |
| entryBuffer.push(e) |
| } |
| |
| stream.on('proxy', proxyPause) |
| function proxyPause (p) { |
| p.pause() |
| } |
| |
| // replace the pipe method with a new version that will |
| // unlock the buffered stuff. if you just call .pipe() |
| // without a destination, then it'll re-play the events. |
| stream.pipe = (function (orig) { |
| return function (dest) { |
| // console.error(' === open the pipes', dest && dest.path) |
| |
| // let the entries flow through one at a time. |
| // Once they're all done, then we can resume completely. |
| var e = 0 |
| ;(function unblockEntry () { |
| var entry = entryBuffer[e++] |
| // console.error(" ==== unblock entry", entry && entry.path) |
| if (!entry) return resume() |
| entry.on('end', unblockEntry) |
| if (dest) dest.add(entry) |
| else stream.emit('entry', entry) |
| })() |
| |
| function resume () { |
| stream.removeListener('entry', saveEntry) |
| stream.removeListener('data', save) |
| stream.removeListener('end', save) |
| |
| stream.pipe = orig |
| if (dest) stream.pipe(dest) |
| |
| buf.forEach(function (b) { |
| if (b) stream.emit('data', b) |
| else stream.emit('end') |
| }) |
| |
| stream.resume() |
| } |
| |
| return dest |
| } |
| })(stream.pipe) |
| } |