blob: 07c638a5ec2edb2b1b49359763f617ccb559792a [file] [log] [blame]
'use strict'
var util = require('util')
var transport = require('../spdy-transport')
var debug = {
server: require('debug')('spdy:connection:server'),
client: require('debug')('spdy:connection:client')
}
var EventEmitter = require('events').EventEmitter
var Stream = transport.Stream
function Connection (socket, options) {
EventEmitter.call(this)
var state = {}
this._spdyState = state
// NOTE: There's a big trick here. Connection is used as a `this` argument
// to the wrapped `connection` event listener.
// socket end doesn't necessarly mean connection drop
this.httpAllowHalfOpen = true
state.timeout = new transport.utils.Timeout(this)
// Protocol info
state.protocol = transport.protocol[options.protocol]
state.version = null
state.constants = state.protocol.constants
state.pair = null
state.isServer = options.isServer
// Root of priority tree (i.e. stream id = 0)
state.priorityRoot = new transport.Priority({
defaultWeight: state.constants.DEFAULT_WEIGHT,
maxCount: transport.protocol.base.constants.MAX_PRIORITY_STREAMS
})
// Defaults
state.maxStreams = options.maxStreams ||
state.constants.MAX_CONCURRENT_STREAMS
state.autoSpdy31 = options.protocol.name !== 'h2' && options.autoSpdy31
state.acceptPush = options.acceptPush === undefined
? !state.isServer
: options.acceptPush
if (options.maxChunk === false) { state.maxChunk = Infinity } else if (options.maxChunk === undefined) { state.maxChunk = transport.protocol.base.constants.DEFAULT_MAX_CHUNK } else {
state.maxChunk = options.maxChunk
}
// Connection-level flow control
var windowSize = options.windowSize || 1 << 20
state.window = new transport.Window({
id: 0,
isServer: state.isServer,
recv: {
size: state.constants.DEFAULT_WINDOW,
max: state.constants.MAX_INITIAL_WINDOW_SIZE
},
send: {
size: state.constants.DEFAULT_WINDOW,
max: state.constants.MAX_INITIAL_WINDOW_SIZE
}
})
// It starts with DEFAULT_WINDOW, update must be sent to change it on client
state.window.recv.setMax(windowSize)
// Boilerplate for Stream constructor
state.streamWindow = new transport.Window({
id: -1,
isServer: state.isServer,
recv: {
size: windowSize,
max: state.constants.MAX_INITIAL_WINDOW_SIZE
},
send: {
size: state.constants.DEFAULT_WINDOW,
max: state.constants.MAX_INITIAL_WINDOW_SIZE
}
})
// Various state info
state.pool = state.protocol.compressionPool.create(options.headerCompression)
state.counters = {
push: 0,
stream: 0
}
// Init streams list
state.stream = {
map: {},
count: 0,
nextId: state.isServer ? 2 : 1,
lastId: {
both: 0,
received: 0
}
}
state.ping = {
nextId: state.isServer ? 2 : 1,
map: {}
}
state.goaway = false
// Debug
state.debug = state.isServer ? debug.server : debug.client
// X-Forwarded feature
state.xForward = null
// Create parser and hole for framer
state.parser = state.protocol.parser.create({
// NOTE: needed to distinguish ping from ping ACK in SPDY
isServer: state.isServer,
window: state.window
})
state.framer = state.protocol.framer.create({
window: state.window,
timeout: state.timeout
})
// SPDY has PUSH enabled on servers
if (state.protocol.name === 'spdy') {
state.framer.enablePush(state.isServer)
}
if (!state.isServer) { state.parser.skipPreface() }
this.socket = socket
this._init()
}
util.inherits(Connection, EventEmitter)
exports.Connection = Connection
Connection.create = function create (socket, options) {
return new Connection(socket, options)
}
Connection.prototype._init = function init () {
var self = this
var state = this._spdyState
var pool = state.pool
// Initialize session window
state.window.recv.on('drain', function () {
self._onSessionWindowDrain()
})
// Initialize parser
state.parser.on('data', function (frame) {
self._handleFrame(frame)
})
state.parser.once('version', function (version) {
self._onVersion(version)
})
// Propagate parser errors
state.parser.on('error', function (err) {
self._onParserError(err)
})
// Propagate framer errors
state.framer.on('error', function (err) {
self.emit('error', err)
})
this.socket.pipe(state.parser)
state.framer.pipe(this.socket)
// Allow high-level api to catch socket errors
this.socket.on('error', function onSocketError (e) {
self.emit('error', e)
})
this.socket.once('close', function onclose (hadError) {
var err
if (hadError) {
err = new Error('socket hang up')
err.code = 'ECONNRESET'
}
self.destroyStreams(err)
self.emit('close')
if (state.pair) {
pool.put(state.pair)
}
state.framer.resume()
})
// Reset timeout on close
this.once('close', function () {
self.setTimeout(0)
})
function _onWindowOverflow () {
self._onWindowOverflow()
}
state.window.recv.on('overflow', _onWindowOverflow)
state.window.send.on('overflow', _onWindowOverflow)
// Do not allow half-open connections
this.socket.allowHalfOpen = false
}
Connection.prototype._onVersion = function _onVersion (version) {
var state = this._spdyState
var prev = state.version
var parser = state.parser
var framer = state.framer
var pool = state.pool
state.version = version
state.debug('id=0 version=%d', version)
// Ignore transition to 3.1
if (!prev) {
state.pair = pool.get(version)
parser.setCompression(state.pair)
framer.setCompression(state.pair)
}
framer.setVersion(version)
if (!state.isServer) {
framer.prefaceFrame()
if (state.xForward !== null) {
framer.xForwardedFor({ host: state.xForward })
}
}
// Send preface+settings frame (once)
framer.settingsFrame({
max_header_list_size: state.constants.DEFAULT_MAX_HEADER_LIST_SIZE,
max_concurrent_streams: state.maxStreams,
enable_push: state.acceptPush ? 1 : 0,
initial_window_size: state.window.recv.max
})
// Update session window
if (state.version >= 3.1 || (state.isServer && state.autoSpdy31)) { this._onSessionWindowDrain() }
this.emit('version', version)
}
Connection.prototype._onParserError = function _onParserError (err) {
var state = this._spdyState
// Prevent further errors
state.parser.kill()
// Send GOAWAY
if (err instanceof transport.protocol.base.utils.ProtocolError) {
this._goaway({
lastId: state.stream.lastId.both,
code: err.code,
extra: err.message,
send: true
})
}
this.emit('error', err)
}
Connection.prototype._handleFrame = function _handleFrame (frame) {
var state = this._spdyState
state.debug('id=0 frame', frame)
state.timeout.reset()
// For testing purposes
this.emit('frame', frame)
var stream
// Session window update
if (frame.type === 'WINDOW_UPDATE' && frame.id === 0) {
if (state.version < 3.1 && state.autoSpdy31) {
state.debug('id=0 switch version to 3.1')
state.version = 3.1
this.emit('version', 3.1)
}
state.window.send.update(frame.delta)
return
}
if (state.isServer && frame.type === 'PUSH_PROMISE') {
state.debug('id=0 server PUSH_PROMISE')
this._goaway({
lastId: state.stream.lastId.both,
code: 'PROTOCOL_ERROR',
send: true
})
return
}
if (!stream && frame.id !== undefined) {
// Load created one
stream = state.stream.map[frame.id]
// Fail if not found
if (!stream &&
frame.type !== 'HEADERS' &&
frame.type !== 'PRIORITY' &&
frame.type !== 'RST') {
// Other side should destroy the stream upon receiving GOAWAY
if (this._isGoaway(frame.id)) { return }
state.debug('id=0 stream=%d not found', frame.id)
state.framer.rstFrame({ id: frame.id, code: 'INVALID_STREAM' })
return
}
}
// Create new stream
if (!stream && frame.type === 'HEADERS') {
this._handleHeaders(frame)
return
}
if (stream) {
stream._handleFrame(frame)
} else if (frame.type === 'SETTINGS') {
this._handleSettings(frame.settings)
} else if (frame.type === 'ACK_SETTINGS') {
// TODO(indutny): handle it one day
} else if (frame.type === 'PING') {
this._handlePing(frame)
} else if (frame.type === 'GOAWAY') {
this._handleGoaway(frame)
} else if (frame.type === 'X_FORWARDED_FOR') {
// Set X-Forwarded-For only once
if (state.xForward === null) {
state.xForward = frame.host
}
} else if (frame.type === 'PRIORITY') {
// TODO(indutny): handle this
} else {
state.debug('id=0 unknown frame type: %s', frame.type)
}
}
Connection.prototype._onWindowOverflow = function _onWindowOverflow () {
var state = this._spdyState
state.debug('id=0 window overflow')
this._goaway({
lastId: state.stream.lastId.both,
code: 'FLOW_CONTROL_ERROR',
send: true
})
}
Connection.prototype._isGoaway = function _isGoaway (id) {
var state = this._spdyState
if (state.goaway !== false && state.goaway < id) { return true }
return false
}
Connection.prototype._getId = function _getId () {
var state = this._spdyState
var id = state.stream.nextId
state.stream.nextId += 2
return id
}
Connection.prototype._createStream = function _createStream (uri) {
var state = this._spdyState
var id = uri.id
if (id === undefined) { id = this._getId() }
var isGoaway = this._isGoaway(id)
if (uri.push && !state.acceptPush) {
state.debug('id=0 push disabled promisedId=%d', id)
// Fatal error
this._goaway({
lastId: state.stream.lastId.both,
code: 'PROTOCOL_ERROR',
send: true
})
isGoaway = true
}
var stream = new Stream(this, {
id: id,
request: uri.request !== false,
method: uri.method,
path: uri.path,
host: uri.host,
priority: uri.priority,
headers: uri.headers,
parent: uri.parent,
readable: !isGoaway && uri.readable,
writable: !isGoaway && uri.writable
})
var self = this
// Just an empty stream for API consistency
if (isGoaway) {
return stream
}
state.stream.lastId.both = Math.max(state.stream.lastId.both, id)
state.debug('id=0 add stream=%d', stream.id)
state.stream.map[stream.id] = stream
state.stream.count++
state.counters.stream++
if (stream.parent !== null) {
state.counters.push++
}
stream.once('close', function () {
self._removeStream(stream)
})
return stream
}
Connection.prototype._handleHeaders = function _handleHeaders (frame) {
var state = this._spdyState
// Must be HEADERS frame after stream close
if (frame.id <= state.stream.lastId.received) { return }
// Someone is using our ids!
if ((frame.id + state.stream.nextId) % 2 === 0) {
state.framer.rstFrame({ id: frame.id, code: 'PROTOCOL_ERROR' })
return
}
var stream = this._createStream({
id: frame.id,
request: false,
method: frame.headers[':method'],
path: frame.headers[':path'],
host: frame.headers[':authority'],
priority: frame.priority,
headers: frame.headers,
writable: frame.writable
})
// GOAWAY
if (this._isGoaway(stream.id)) {
return
}
state.stream.lastId.received = Math.max(
state.stream.lastId.received,
stream.id
)
// TODO(indutny) handle stream limit
if (!this.emit('stream', stream)) {
// No listeners was set - abort the stream
stream.abort()
return
}
// Create fake frame to simulate end of the data
if (frame.fin) {
stream._handleFrame({ type: 'FIN', fin: true })
}
return stream
}
Connection.prototype._onSessionWindowDrain = function _onSessionWindowDrain () {
var state = this._spdyState
if (state.version < 3.1 && !(state.isServer && state.autoSpdy31)) {
return
}
var delta = state.window.recv.getDelta()
if (delta === 0) {
return
}
state.debug('id=0 session window drain, update by %d', delta)
state.framer.windowUpdateFrame({
id: 0,
delta: delta
})
state.window.recv.update(delta)
}
Connection.prototype.start = function start (version) {
this._spdyState.parser.setVersion(version)
}
// Mostly for testing
Connection.prototype.getVersion = function getVersion () {
return this._spdyState.version
}
Connection.prototype._handleSettings = function _handleSettings (settings) {
var state = this._spdyState
state.framer.ackSettingsFrame()
this._setDefaultWindow(settings)
if (settings.max_frame_size) { state.framer.setMaxFrameSize(settings.max_frame_size) }
// TODO(indutny): handle max_header_list_size
if (settings.header_table_size) {
try {
state.pair.compress.updateTableSize(settings.header_table_size)
} catch (e) {
this._goaway({
lastId: 0,
code: 'PROTOCOL_ERROR',
send: true
})
return
}
}
// HTTP2 clients needs to enable PUSH streams explicitly
if (state.protocol.name !== 'spdy') {
if (settings.enable_push === undefined) {
state.framer.enablePush(state.isServer)
} else {
state.framer.enablePush(settings.enable_push === 1)
}
}
// TODO(indutny): handle max_concurrent_streams
}
Connection.prototype._setDefaultWindow = function _setDefaultWindow (settings) {
if (settings.initial_window_size === undefined) {
return
}
var state = this._spdyState
// Update defaults
var window = state.streamWindow
window.send.setMax(settings.initial_window_size)
// Update existing streams
Object.keys(state.stream.map).forEach(function (id) {
var stream = state.stream.map[id]
var window = stream._spdyState.window
window.send.updateMax(settings.initial_window_size)
})
}
Connection.prototype._handlePing = function handlePing (frame) {
var self = this
var state = this._spdyState
// Handle incoming PING
if (!frame.ack) {
state.framer.pingFrame({
opaque: frame.opaque,
ack: true
})
self.emit('ping', frame.opaque)
return
}
// Handle reply PING
var hex = frame.opaque.toString('hex')
if (!state.ping.map[hex]) {
return
}
var ping = state.ping.map[hex]
delete state.ping.map[hex]
if (ping.cb) {
ping.cb(null)
}
}
Connection.prototype._handleGoaway = function handleGoaway (frame) {
this._goaway({
lastId: frame.lastId,
code: frame.code,
send: false
})
}
Connection.prototype.ping = function ping (callback) {
var state = this._spdyState
// HTTP2 is using 8-byte opaque
var opaque = Buffer.alloc(state.constants.PING_OPAQUE_SIZE)
opaque.fill(0)
opaque.writeUInt32BE(state.ping.nextId, opaque.length - 4)
state.ping.nextId += 2
state.ping.map[opaque.toString('hex')] = { cb: callback }
state.framer.pingFrame({
opaque: opaque,
ack: false
})
}
Connection.prototype.getCounter = function getCounter (name) {
return this._spdyState.counters[name]
}
Connection.prototype.reserveStream = function reserveStream (uri, callback) {
var stream = this._createStream(uri)
// GOAWAY
if (this._isGoaway(stream.id)) {
var err = new Error('Can\'t send request after GOAWAY')
process.nextTick(function () {
if (callback) { callback(err) } else {
stream.emit('error', err)
}
})
return stream
}
if (callback) {
process.nextTick(function () {
callback(null, stream)
})
}
return stream
}
Connection.prototype.request = function request (uri, callback) {
var stream = this.reserveStream(uri, function (err) {
if (err) {
if (callback) {
callback(err)
} else {
stream.emit('error', err)
}
return
}
if (stream._wasSent()) {
if (callback) {
callback(null, stream)
}
return
}
stream.send(function (err) {
if (err) {
if (callback) { return callback(err) } else { return stream.emit('error', err) }
}
if (callback) {
callback(null, stream)
}
})
})
return stream
}
Connection.prototype._removeStream = function _removeStream (stream) {
var state = this._spdyState
state.debug('id=0 remove stream=%d', stream.id)
delete state.stream.map[stream.id]
state.stream.count--
if (state.stream.count === 0) {
this.emit('_streamDrain')
}
}
Connection.prototype._goaway = function _goaway (params) {
var state = this._spdyState
var self = this
state.goaway = params.lastId
state.debug('id=0 goaway from=%d', state.goaway)
Object.keys(state.stream.map).forEach(function (id) {
var stream = state.stream.map[id]
// Abort every stream started after GOAWAY
if (stream.id <= params.lastId) {
return
}
stream.abort()
stream.emit('error', new Error('New stream after GOAWAY'))
})
function finish () {
// Destroy socket if there are no streams
if (state.stream.count === 0 || params.code !== 'OK') {
// No further frames should be processed
state.parser.kill()
process.nextTick(function () {
var err = new Error('Fatal error: ' + params.code)
self._onStreamDrain(err)
})
return
}
self.on('_streamDrain', self._onStreamDrain)
}
if (params.send) {
// Make sure that GOAWAY frame is sent before dumping framer
state.framer.goawayFrame({
lastId: params.lastId,
code: params.code,
extra: params.extra
}, finish)
} else {
finish()
}
}
Connection.prototype._onStreamDrain = function _onStreamDrain (error) {
var state = this._spdyState
state.debug('id=0 _onStreamDrain')
state.framer.dump()
state.framer.unpipe(this.socket)
state.framer.resume()
if (this.socket.destroySoon) {
this.socket.destroySoon()
}
this.emit('close', error)
}
Connection.prototype.end = function end (callback) {
var state = this._spdyState
if (callback) {
this.once('close', callback)
}
this._goaway({
lastId: state.stream.lastId.both,
code: 'OK',
send: true
})
}
Connection.prototype.destroyStreams = function destroyStreams (err) {
var state = this._spdyState
Object.keys(state.stream.map).forEach(function (id) {
var stream = state.stream.map[id]
stream.destroy()
if (err) {
stream.emit('error', err)
}
})
}
Connection.prototype.isServer = function isServer () {
return this._spdyState.isServer
}
Connection.prototype.getXForwardedFor = function getXForwardFor () {
return this._spdyState.xForward
}
Connection.prototype.sendXForwardedFor = function sendXForwardedFor (host) {
var state = this._spdyState
if (state.version !== null) {
state.framer.xForwardedFor({ host: host })
} else {
state.xForward = host
}
}
Connection.prototype.pushPromise = function pushPromise (parent, uri, callback) {
var state = this._spdyState
var stream = this._createStream({
request: false,
parent: parent,
method: uri.method,
path: uri.path,
host: uri.host,
priority: uri.priority,
headers: uri.headers,
readable: false
})
var err
// TODO(indutny): deduplicate this logic somehow
if (this._isGoaway(stream.id)) {
err = new Error('Can\'t send PUSH_PROMISE after GOAWAY')
process.nextTick(function () {
if (callback) {
callback(err)
} else {
stream.emit('error', err)
}
})
return stream
}
if (uri.push && !state.acceptPush) {
err = new Error(
'Can\'t send PUSH_PROMISE, other side won\'t accept it')
process.nextTick(function () {
if (callback) { callback(err) } else {
stream.emit('error', err)
}
})
return stream
}
stream._sendPush(uri.status, uri.response, function (err) {
if (!callback) {
if (err) {
stream.emit('error', err)
}
return
}
if (err) { return callback(err) }
callback(null, stream)
})
return stream
}
Connection.prototype.setTimeout = function setTimeout (delay, callback) {
var state = this._spdyState
state.timeout.set(delay, callback)
}