| // |
| // |
| // |
| |
| 'use strict'; |
| |
| // A Mux is an object into which other readable streams may be piped; |
| // it then writes 'packets' from the upstreams to the given |
| // downstream. |
| |
| var inherits = require('util').inherits; |
| var assert = require('assert'); |
| |
| var schedule = (typeof setImmediate === 'function') ? |
| setImmediate : process.nextTick; |
| |
| function Mux(downstream) { |
| this.newStreams = []; |
| this.oldStreams = []; |
| this.blocked = false; |
| this.scheduledRead = false; |
| |
| this.out = downstream; |
| var self = this; |
| downstream.on('drain', function() { |
| self.blocked = false; |
| self._readIncoming(); |
| }); |
| } |
| |
| // There are 2 states we can be in: |
| |
| // - waiting for outbound capacity, which will be signalled by a |
| // - 'drain' event on the downstream; or, |
| |
| // - no packets to send, waiting for an inbound buffer to have |
| // packets, which will be signalled by a 'readable' event |
| |
| // If we write all packets available whenever there is outbound |
| // capacity, we will either run out of outbound capacity (`#write` |
| // returns false), or run out of packets (all calls to an |
| // `inbound.read()` have returned null). |
| |
| Mux.prototype._readIncoming = function() { |
| |
| // We may be sent here speculatively, if an incoming stream has |
| // become readable |
| if (this.blocked) return; |
| |
| var self = this; |
| var accepting = true; |
| var out = this.out; |
| |
| // Try to read a chunk from each stream in turn, until all streams |
| // are empty, or we exhaust our ability to accept chunks. |
| function roundrobin(streams) { |
| var s; |
| // if there's just one incoming stream we don't have to |
| // go through all the dequeue/enqueueing |
| if (streams.length === 1) { |
| s = streams.shift(); |
| while (accepting) { |
| var chunk = s.read(); |
| if (chunk !== null) { |
| accepting = out.write(chunk); |
| } |
| else break; |
| } |
| if (!accepting) streams.push(s); |
| } |
| else { |
| while (accepting && (s = streams.shift())) { |
| var chunk = s.read(); |
| if (chunk !== null) { |
| accepting = out.write(chunk); |
| streams.push(s); |
| } |
| } |
| } |
| } |
| |
| roundrobin(this.newStreams); |
| |
| // Either we exhausted the new queues, or we ran out of capacity. If |
| // we ran out of capacity, all the remaining new streams (i.e., |
| // those with packets left) become old streams. This effectively |
| // prioritises streams that keep their buffers close to empty over |
| // those that are constantly near full. |
| |
| if (accepting) { // all new queues are exhausted, write as many as |
| // we can from the old streams |
| assert.equal(0, this.newStreams.length); |
| roundrobin(this.oldStreams); |
| } |
| else { // ran out of room |
| assert(this.newStreams.length > 0, "Expect some new streams to remain"); |
| this.oldStreams = this.oldStreams.concat(this.newStreams); |
| this.newStreams = []; |
| } |
| // We may have exhausted all the old queues, or run out of room; |
| // either way, all we need to do is record whether we have capacity |
| // or not, so any speculative reads will know |
| this.blocked = !accepting; |
| }; |
| |
| Mux.prototype._scheduleRead = function() { |
| var self = this; |
| |
| if (!self.scheduledRead) { |
| schedule(function() { |
| self.scheduledRead = false; |
| self._readIncoming(); |
| }); |
| self.scheduledRead = true; |
| } |
| }; |
| |
| Mux.prototype.pipeFrom = function(readable) { |
| var self = this; |
| |
| function enqueue() { |
| self.newStreams.push(readable); |
| self._scheduleRead(); |
| } |
| |
| function cleanup() { |
| readable.removeListener('readable', enqueue); |
| readable.removeListener('error', cleanup); |
| readable.removeListener('end', cleanup); |
| readable.removeListener('unpipeFrom', cleanupIfMe); |
| } |
| function cleanupIfMe(dest) { |
| if (dest === self) cleanup(); |
| } |
| |
| readable.on('unpipeFrom', cleanupIfMe); |
| readable.on('end', cleanup); |
| readable.on('error', cleanup); |
| readable.on('readable', enqueue); |
| }; |
| |
| Mux.prototype.unpipeFrom = function(readable) { |
| readable.emit('unpipeFrom', this); |
| }; |
| |
| module.exports.Mux = Mux; |