blob: dc08cac594fc4c145a37febf4690139de4e53313 [file] [log] [blame]
'use strict'
var transport = require('../../../spdy-transport')
var utils = transport.utils
var assert = require('assert')
var util = require('util')
var debug = require('debug')('spdy:scheduler')
var Readable = require('readable-stream').Readable
/*
* We create following structure in `pending`:
* [ [ id = 0 ], [ id = 1 ], [ id = 2 ], [ id = 0 ] ]
* chunks chunks chunks chunks
* chunks chunks
* chunks
*
* Then on the `.tick()` pass we pick one chunks from each item and remove the
* item if it is empty:
*
* [ [ id = 0 ], [ id = 2 ] ]
* chunks chunks
* chunks
*
* Writing out: chunks for 0, chunks for 1, chunks for 2, chunks for 0
*
* This way data is interleaved between the different streams.
*/
function Scheduler (options) {
Readable.call(this)
// Pretty big window by default
this.window = 0.25
if (options && options.window) { this.window = options.window }
this.sync = []
this.list = []
this.count = 0
this.pendingTick = false
}
util.inherits(Scheduler, Readable)
module.exports = Scheduler
// Just for testing, really
Scheduler.create = function create (options) {
return new Scheduler(options)
}
function insertCompare (a, b) {
return a.priority === b.priority
? a.stream - b.stream
: b.priority - a.priority
}
Scheduler.prototype.schedule = function schedule (data) {
var priority = data.priority
var stream = data.stream
var chunks = data.chunks
// Synchronous frames should not be interleaved
if (priority === false) {
debug('queue sync', chunks)
this.sync.push(data)
this.count += chunks.length
this._read()
return
}
debug('queue async priority=%d stream=%d', priority, stream, chunks)
var item = new SchedulerItem(stream, priority)
var index = utils.binaryLookup(this.list, item, insertCompare)
// Push new item
if (index >= this.list.length || insertCompare(this.list[index], item) !== 0) {
this.list.splice(index, 0, item)
} else { // Coalesce
item = this.list[index]
}
item.push(data)
this.count += chunks.length
this._read()
}
Scheduler.prototype._read = function _read () {
if (this.count === 0) {
return
}
if (this.pendingTick) {
return
}
this.pendingTick = true
var self = this
process.nextTick(function () {
self.pendingTick = false
self.tick()
})
}
Scheduler.prototype.tick = function tick () {
// No luck for async frames
if (!this.tickSync()) { return false }
return this.tickAsync()
}
Scheduler.prototype.tickSync = function tickSync () {
// Empty sync queue first
var sync = this.sync
var res = true
this.sync = []
for (var i = 0; i < sync.length; i++) {
var item = sync[i]
debug('tick sync pending=%d', this.count, item.chunks)
for (var j = 0; j < item.chunks.length; j++) {
this.count--
// TODO: handle stream backoff properly
try {
res = this.push(item.chunks[j])
} catch (err) {
this.emit('error', err)
return false
}
}
debug('after tick sync pending=%d', this.count)
// TODO(indutny): figure out the way to invoke callback on actual write
if (item.callback) {
item.callback(null)
}
}
return res
}
Scheduler.prototype.tickAsync = function tickAsync () {
var res = true
var list = this.list
if (list.length === 0) {
return res
}
var startPriority = list[0].priority
for (var index = 0; list.length > 0; index++) {
// Loop index
index %= list.length
if (startPriority - list[index].priority > this.window) { index = 0 }
debug('tick async index=%d start=%d', index, startPriority)
var current = list[index]
var item = current.shift()
if (current.isEmpty()) {
list.splice(index, 1)
if (index === 0 && list.length > 0) {
startPriority = list[0].priority
}
index--
}
debug('tick async pending=%d', this.count, item.chunks)
for (var i = 0; i < item.chunks.length; i++) {
this.count--
// TODO: handle stream backoff properly
try {
res = this.push(item.chunks[i])
} catch (err) {
this.emit('error', err)
return false
}
}
debug('after tick pending=%d', this.count)
// TODO(indutny): figure out the way to invoke callback on actual write
if (item.callback) {
item.callback(null)
}
if (!res) { break }
}
return res
}
Scheduler.prototype.dump = function dump () {
this.tickSync()
// Write everything out
while (!this.tickAsync()) {
// Intentional no-op
}
assert.strictEqual(this.count, 0)
}
function SchedulerItem (stream, priority) {
this.stream = stream
this.priority = priority
this.queue = []
}
SchedulerItem.prototype.push = function push (chunks) {
this.queue.push(chunks)
}
SchedulerItem.prototype.shift = function shift () {
return this.queue.shift()
}
SchedulerItem.prototype.isEmpty = function isEmpty () {
return this.queue.length === 0
}