blob: a33f47aedb7da803486607a01d391f0060e5318b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
var http = require('http');
var https = require('https');
var url = require("url");
var path = require("path");
var fs = require("fs");
var crypto = require("crypto");
var log = require('./log');
var MultiplexedProcessor = require('./multiplexed_processor').MultiplexedProcessor;
var TBufferedTransport = require('./buffered_transport');
var TBinaryProtocol = require('./binary_protocol');
var InputBufferUnderrunError = require('./input_buffer_underrun_error');
// WSFrame constructor and prototype
/////////////////////////////////////////////////////////////////////
/** Apache Thrift RPC Web Socket Transport
* Frame layout conforming to RFC 6455 circa 12/2011
*
* Theoretical frame size limit is 4GB*4GB, however the Node Buffer
* limit is 1GB as of v0.10. The frame length encoding is also
* configured for a max of 4GB presently and needs to be adjusted
* if Node/Browsers become capabile of > 4GB frames.
*
* - FIN is 1 if the message is complete
* - RSV1/2/3 are always 0
* - Opcode is 1(TEXT) for TJSONProtocol and 2(BIN) for TBinaryProtocol
* - Mask Present bit is 1 sending to-server and 0 sending to-client
* - Payload Len:
* + If < 126: then represented directly
* + If >=126: but within range of an unsigned 16 bit integer
* then Payload Len is 126 and the two following bytes store
* the length
* + Else: Payload Len is 127 and the following 8 bytes store the
* length as an unsigned 64 bit integer
* - Masking key is a 32 bit key only present when sending to the server
* - Payload follows the masking key or length
*
* 0 1 2 3
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
* +-+-+-+-+-------+-+-------------+-------------------------------+
* |F|R|R|R| opcode|M| Payload len | Extended payload length |
* |I|S|S|S| (4) |A| (7) | (16/64) |
* |N|V|V|V| |S| | (if payload len==126/127) |
* | |1|2|3| |K| | |
* +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
* | Extended payload length continued, if payload len == 127 |
* + - - - - - - - - - - - - - - - +-------------------------------+
* | |Masking-key, if MASK set to 1 |
* +-------------------------------+-------------------------------+
* | Masking-key (continued) | Payload Data |
* +-------------------------------- - - - - - - - - - - - - - - - +
* : Payload Data continued ... :
* + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
* | Payload Data continued ... |
* +---------------------------------------------------------------+
*/
var wsFrame = {
/** Encodes a WebSocket frame
*
* @param {Buffer} data - The raw data to encode
* @param {Buffer} mask - The mask to apply when sending to server, null for no mask
* @param {Boolean} binEncoding - True for binary encoding, false for text encoding
* @returns {Buffer} - The WebSocket frame, ready to send
*/
encode: function(data, mask, binEncoding) {
var frame = new Buffer(wsFrame.frameSizeFromData(data, mask));
//Byte 0 - FIN & OPCODE
frame[0] = wsFrame.fin.FIN +
(binEncoding ? wsFrame.frameOpCodes.BIN : wsFrame.frameOpCodes.TEXT);
//Byte 1 or 1-3 or 1-9 - MASK FLAG & SIZE
var payloadOffset = 2;
if (data.length < 0x7E) {
frame[1] = data.length + (mask ? wsFrame.mask.TO_SERVER : wsFrame.mask.TO_CLIENT);
} else if (data.length < 0xFFFF) {
frame[1] = 0x7E + (mask ? wsFrame.mask.TO_SERVER : wsFrame.mask.TO_CLIENT);
frame.writeUInt16BE(data.length, 2, true);
payloadOffset = 4;
} else {
frame[1] = 0x7F + (mask ? wsFrame.mask.TO_SERVER : wsFrame.mask.TO_CLIENT);
frame.writeUInt32BE(0, 2, true);
frame.writeUInt32BE(data.length, 6, true);
payloadOffset = 10;
}
//MASK
if (mask) {
mask.copy(frame, payloadOffset, 0, 4);
payloadOffset += 4;
}
//Payload
data.copy(frame, payloadOffset);
if (mask) {
wsFrame.applyMask(frame.slice(payloadOffset), frame.slice(payloadOffset-4,payloadOffset));
}
return frame;
},
/**
* @class
* @name WSDecodeResult
* @property {Buffer} data - The decoded data for the first ATRPC message
* @property {Buffer} mask - The frame mask
* @property {Boolean} binEncoding - True if binary (TBinaryProtocol),
* False if text (TJSONProtocol)
* @property {Buffer} nextFrame - Multiple ATRPC messages may be sent in a
* single WebSocket frame, this Buffer contains
* any bytes remaining to be decoded
* @property {Boolean} FIN - True is the message is complete
*/
/** Decodes a WebSocket frame
*
* @param {Buffer} frame - The raw inbound frame, if this is a continuation
* frame it must have a mask property with the mask.
* @returns {WSDecodeResult} - The decoded payload
*
* @see {@link WSDecodeResult}
*/
decode: function(frame) {
var result = {
data: null,
mask: null,
binEncoding: false,
nextFrame: null,
FIN: true
};
//Byte 0 - FIN & OPCODE
if (wsFrame.fin.FIN != (frame[0] & wsFrame.fin.FIN)) {
result.FIN = false;
}
result.binEncoding = (wsFrame.frameOpCodes.BIN == (frame[0] & wsFrame.frameOpCodes.BIN));
//Byte 1 or 1-3 or 1-9 - SIZE
var lenByte = (frame[1] & 0x0000007F);
var len = lenByte;
var dataOffset = 2;
if (lenByte == 0x7E) {
len = frame.readUInt16BE(2);
dataOffset = 4;
} else if (lenByte == 0x7F) {
len = frame.readUInt32BE(6);
dataOffset = 10;
}
//MASK
if (wsFrame.mask.TO_SERVER == (frame[1] & wsFrame.mask.TO_SERVER)) {
result.mask = new Buffer(4);
frame.copy(result.mask, 0, dataOffset, dataOffset + 4);
dataOffset += 4;
}
//Payload
result.data = new Buffer(len);
frame.copy(result.data, 0, dataOffset, dataOffset+len);
if (result.mask) {
wsFrame.applyMask(result.data, result.mask);
}
//Next Frame
if (frame.length > dataOffset+len) {
result.nextFrame = new Buffer(frame.length - (dataOffset+len));
frame.copy(result.nextFrame, 0, dataOffset+len, frame.length);
}
//Don't forward control frames
if (frame[0] & wsFrame.frameOpCodes.FINCTRL) {
result.data = null;
}
return result;
},
/** Masks/Unmasks data
*
* @param {Buffer} data - data to mask/unmask in place
* @param {Buffer} mask - the mask
*/
applyMask: function(data, mask){
//TODO: look into xoring words at a time
var dataLen = data.length;
var maskLen = mask.length;
for (var i = 0; i < dataLen; i++) {
data[i] = data[i] ^ mask[i%maskLen];
}
},
/** Computes frame size on the wire from data to be sent
*
* @param {Buffer} data - data.length is the assumed payload size
* @param {Boolean} mask - true if a mask will be sent (TO_SERVER)
*/
frameSizeFromData: function(data, mask) {
var headerSize = 10;
if (data.length < 0x7E) {
headerSize = 2;
} else if (data.length < 0xFFFF) {
headerSize = 4;
}
return headerSize + data.length + (mask ? 4 : 0);
},
frameOpCodes: {
CONT: 0x00,
TEXT: 0x01,
BIN: 0x02,
CTRL: 0x80
},
mask: {
TO_SERVER: 0x80,
TO_CLIENT: 0x00
},
fin: {
CONT: 0x00,
FIN: 0x80
}
};
// createWebServer constructor and options
/////////////////////////////////////////////////////////////////////
/**
* @class
* @name ServerOptions
* @property {array} cors - Array of CORS origin strings to permit requests from.
* @property {string} files - Path to serve static files from, if absent or ""
* static file service is disabled.
* @property {object} headers - An object hash mapping header strings to header value
* strings, these headers are transmitted in response to
* static file GET operations.
* @property {object} services - An object hash mapping service URI strings
* to ServiceOptions objects
* @property {object} tls - Node.js TLS options (see: nodejs.org/api/tls.html),
* if not present or null regular http is used,
* at least a key and a cert must be defined to use SSL/TLS
* @see {@link ServiceOptions}
*/
/**
* @class
* @name ServiceOptions
* @property {object} transport - The layered transport to use (defaults
* to TBufferedTransport).
* @property {object} protocol - The serialization Protocol to use (defaults to
* TBinaryProtocol).
* @property {object} processor - The Thrift Service class/processor generated
* by the IDL Compiler for the service (the "cls"
* key can also be used for this attribute).
* @property {object} handler - The handler methods for the Thrift Service.
*/
/**
* Create a Thrift server which can serve static files and/or one or
* more Thrift Services.
* @param {ServerOptions} options - The server configuration.
* @returns {object} - The Apache Thrift Web Server.
*/
exports.createWebServer = function(options) {
var baseDir = options.files;
var contentTypesByExtension = {
'.txt': 'text/plain',
'.html': 'text/html',
'.css': 'text/css',
'.xml': 'application/xml',
'.json': 'application/json',
'.js': 'application/javascript',
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.gif': 'image/gif',
'.png': 'image/png',
'.svg': 'image/svg+xml'
};
//Setup all of the services
var services = options.services;
for (var uri in services) {
var svcObj = services[uri];
//Setup the processor
if (svcObj.processor instanceof MultiplexedProcessor) {
//Multiplex processors have pre embedded processor/handler pairs, save as is
svcObj.processor = svcObj.processor;
} else {
//For historical reasons Node.js supports processors passed in directly or via the
// IDL Compiler generated class housing the processor. Also, the options property
// for a Processor has been called both cls and processor at different times. We
// support any of the four possibilities here.
var processor = (svcObj.processor) ? (svcObj.processor.Processor || svcObj.processor) :
(svcObj.cls.Processor || svcObj.cls);
//Processors can be supplied as constructed objects with handlers already embedded,
// if a handler is provided we construct a new processor, if not we use the processor
// object directly
if (svcObj.handler) {
svcObj.processor = new processor(svcObj.handler);
} else {
svcObj.processor = processor;
}
}
svcObj.transport = svcObj.transport ? svcObj.transport : TBufferedTransport;
svcObj.protocol = svcObj.protocol ? svcObj.protocol : TBinaryProtocol;
}
//Verify CORS requirements
function VerifyCORSAndSetHeaders(request, response) {
if (request.headers.origin && options.cors) {
if (options.cors["*"] || options.cors[request.headers.origin]) {
//Allow, origin allowed
response.setHeader("access-control-allow-origin", request.headers.origin);
response.setHeader("access-control-allow-methods", "GET, POST, OPTIONS");
response.setHeader("access-control-allow-headers", "content-type, accept");
response.setHeader("access-control-max-age", "60");
return true;
} else {
//Disallow, origin denied
return false;
}
}
//Allow, CORS is not in use
return true;
}
//Handle OPTIONS method (CORS)
///////////////////////////////////////////////////
function processOptions(request, response) {
if (VerifyCORSAndSetHeaders(request, response)) {
response.writeHead("204", "No Content", {"content-length": 0});
} else {
response.writeHead("403", "Origin " + request.headers.origin + " not allowed", {});
}
response.end();
}
//Handle POST methods (TXHRTransport)
///////////////////////////////////////////////////
function processPost(request, response) {
//Lookup service
var uri = url.parse(request.url).pathname;
var svc = services[uri];
if (!svc) {
response.writeHead("403", "No Apache Thrift Service at " + uri, {});
response.end();
return;
}
//Verify CORS requirements
if (!VerifyCORSAndSetHeaders(request, response)) {
response.writeHead("403", "Origin " + request.headers.origin + " not allowed", {});
response.end();
return;
}
//Process XHR payload
request.on('data', svc.transport.receiver(function(transportWithData) {
var input = new svc.protocol(transportWithData);
var output = new svc.protocol(new svc.transport(undefined, function(buf) {
try {
response.writeHead(200);
response.end(buf);
} catch (err) {
response.writeHead(500);
response.end();
}
}));
try {
svc.processor.process(input, output);
transportWithData.commitPosition();
} catch (err) {
if (err instanceof InputBufferUnderrunError) {
transportWithData.rollbackPosition();
} else {
response.writeHead(500);
response.end();
}
}
}));
}
//Handle GET methods (Static Page Server)
///////////////////////////////////////////////////
function processGet(request, response) {
//Undefined or empty base directory means do not serve static files
if (!baseDir || "" === baseDir) {
response.writeHead(404);
response.end();
return;
}
//Verify CORS requirements
if (!VerifyCORSAndSetHeaders(request, response)) {
response.writeHead("403", "Origin " + request.headers.origin + " not allowed", {});
response.end();
return;
}
//Locate the file requested and send it
var uri = url.parse(request.url).pathname;
var filename = path.resolve(path.join(baseDir, uri));
//Ensure the basedir path is not able to be escaped
if (filename.indexOf(baseDir) != 0) {
response.writeHead(400, "Invalid request path", {});
response.end();
return;
}
fs.exists(filename, function(exists) {
if(!exists) {
response.writeHead(404);
response.end();
return;
}
if (fs.statSync(filename).isDirectory()) {
filename += '/index.html';
}
fs.readFile(filename, "binary", function(err, file) {
if (err) {
response.writeHead(500);
response.end(err + "\n");
return;
}
var headers = {};
var contentType = contentTypesByExtension[path.extname(filename)];
if (contentType) {
headers["Content-Type"] = contentType;
}
for (var k in options.headers) {
headers[k] = options.headers[k];
}
response.writeHead(200, headers);
response.write(file, "binary");
response.end();
});
});
}
//Handle WebSocket calls (TWebSocketTransport)
///////////////////////////////////////////////////
function processWS(data, socket, svc, binEncoding) {
svc.transport.receiver(function(transportWithData) {
var input = new svc.protocol(transportWithData);
var output = new svc.protocol(new svc.transport(undefined, function(buf) {
try {
var frame = wsFrame.encode(buf, null, binEncoding);
socket.write(frame);
} catch (err) {
//TODO: Add better error processing
}
}));
try {
svc.processor.process(input, output);
transportWithData.commitPosition();
}
catch (err) {
if (err instanceof InputBufferUnderrunError) {
transportWithData.rollbackPosition();
}
else {
//TODO: Add better error processing
}
}
})(data);
}
//Create the server (HTTP or HTTPS)
var server = null;
if (options.tls) {
server = https.createServer(options.tls);
} else {
server = http.createServer();
}
//Wire up listeners for upgrade(to WebSocket) & request methods for:
// - GET static files,
// - POST XHR Thrift services
// - OPTIONS CORS requests
server.on('request', function(request, response) {
if (request.method === 'POST') {
processPost(request, response);
} else if (request.method === 'GET') {
processGet(request, response);
} else if (request.method === 'OPTIONS') {
processOptions(request, response);
} else {
response.writeHead(500);
response.end();
}
}).on('upgrade', function(request, socket, head) {
//Lookup service
var svc;
try {
svc = services[Object.keys(services)[0]];
} catch(e) {
socket.write("HTTP/1.1 403 No Apache Thrift Service available\r\n\r\n");
return;
}
//Perform upgrade
var hash = crypto.createHash("sha1");
hash.update(request.headers['sec-websocket-key'] + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11");
socket.write("HTTP/1.1 101 Switching Protocols\r\n" +
"Upgrade: websocket\r\n" +
"Connection: Upgrade\r\n" +
"Sec-WebSocket-Accept: " + hash.digest("base64") + "\r\n" +
"Sec-WebSocket-Origin: " + request.headers.origin + "\r\n" +
"Sec-WebSocket-Location: ws://" + request.headers.host + request.url + "\r\n" +
"\r\n");
//Handle WebSocket traffic
var data = null;
socket.on('data', function(frame) {
try {
while (frame) {
var result = wsFrame.decode(frame);
//Prepend any existing decoded data
if (data) {
if (result.data) {
var newData = new Buffer(data.length + result.data.length);
data.copy(newData);
result.data.copy(newData, data.length);
result.data = newData;
} else {
result.data = data;
}
data = null;
}
//If this completes a message process it
if (result.FIN) {
processWS(result.data, socket, svc, result.binEncoding);
} else {
data = result.data;
}
//Prepare next frame for decoding (if any)
frame = result.nextFrame;
}
} catch(e) {
log.error('TWebSocketTransport Exception: ' + e);
socket.destroy();
}
});
});
//Return the server
return server;
};