blob: 26003e6e0a071e101ff823aea9f34736fda47f66 [file] [log] [blame]
//
//
//
'use strict';
// A Mux is an object into which other readable streams may be piped;
// it then writes 'packets' from the upstreams to the given
// downstream.
var inherits = require('util').inherits;
var assert = require('assert');
var schedule = (typeof setImmediate === 'function') ?
setImmediate : process.nextTick;
function Mux(downstream) {
this.newStreams = [];
this.oldStreams = [];
this.blocked = false;
this.scheduledRead = false;
this.out = downstream;
var self = this;
downstream.on('drain', function() {
self.blocked = false;
self._readIncoming();
});
}
// There are 2 states we can be in:
// - waiting for outbound capacity, which will be signalled by a
// - 'drain' event on the downstream; or,
// - no packets to send, waiting for an inbound buffer to have
// packets, which will be signalled by a 'readable' event
// If we write all packets available whenever there is outbound
// capacity, we will either run out of outbound capacity (`#write`
// returns false), or run out of packets (all calls to an
// `inbound.read()` have returned null).
Mux.prototype._readIncoming = function() {
// We may be sent here speculatively, if an incoming stream has
// become readable
if (this.blocked) return;
var self = this;
var accepting = true;
var out = this.out;
// Try to read a chunk from each stream in turn, until all streams
// are empty, or we exhaust our ability to accept chunks.
function roundrobin(streams) {
var s;
// if there's just one incoming stream we don't have to
// go through all the dequeue/enqueueing
if (streams.length === 1) {
s = streams.shift();
while (accepting) {
var chunk = s.read();
if (chunk !== null) {
accepting = out.write(chunk);
}
else break;
}
if (!accepting) streams.push(s);
}
else {
while (accepting && (s = streams.shift())) {
var chunk = s.read();
if (chunk !== null) {
accepting = out.write(chunk);
streams.push(s);
}
}
}
}
roundrobin(this.newStreams);
// Either we exhausted the new queues, or we ran out of capacity. If
// we ran out of capacity, all the remaining new streams (i.e.,
// those with packets left) become old streams. This effectively
// prioritises streams that keep their buffers close to empty over
// those that are constantly near full.
if (accepting) { // all new queues are exhausted, write as many as
// we can from the old streams
assert.equal(0, this.newStreams.length);
roundrobin(this.oldStreams);
}
else { // ran out of room
assert(this.newStreams.length > 0, "Expect some new streams to remain");
this.oldStreams = this.oldStreams.concat(this.newStreams);
this.newStreams = [];
}
// We may have exhausted all the old queues, or run out of room;
// either way, all we need to do is record whether we have capacity
// or not, so any speculative reads will know
this.blocked = !accepting;
};
Mux.prototype._scheduleRead = function() {
var self = this;
if (!self.scheduledRead) {
schedule(function() {
self.scheduledRead = false;
self._readIncoming();
});
self.scheduledRead = true;
}
};
Mux.prototype.pipeFrom = function(readable) {
var self = this;
function enqueue() {
self.newStreams.push(readable);
self._scheduleRead();
}
function cleanup() {
readable.removeListener('readable', enqueue);
readable.removeListener('error', cleanup);
readable.removeListener('end', cleanup);
readable.removeListener('unpipeFrom', cleanupIfMe);
}
function cleanupIfMe(dest) {
if (dest === self) cleanup();
}
readable.on('unpipeFrom', cleanupIfMe);
readable.on('end', cleanup);
readable.on('error', cleanup);
readable.on('readable', enqueue);
};
Mux.prototype.unpipeFrom = function(readable) {
readable.emit('unpipeFrom', this);
};
module.exports.Mux = Mux;