| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| var http = require('http'); |
| var https = require('https'); |
| var url = require("url"); |
| var path = require("path"); |
| var fs = require("fs"); |
| var crypto = require("crypto"); |
| var log = require('./log'); |
| |
| var MultiplexedProcessor = require('./multiplexed_processor').MultiplexedProcessor; |
| |
| var TBufferedTransport = require('./buffered_transport'); |
| var TBinaryProtocol = require('./binary_protocol'); |
| var InputBufferUnderrunError = require('./input_buffer_underrun_error'); |
| |
| // WSFrame constructor and prototype |
| ///////////////////////////////////////////////////////////////////// |
| |
| /** Apache Thrift RPC Web Socket Transport |
| * Frame layout conforming to RFC 6455 circa 12/2011 |
| * |
| * Theoretical frame size limit is 4GB*4GB, however the Node Buffer |
| * limit is 1GB as of v0.10. The frame length encoding is also |
| * configured for a max of 4GB presently and needs to be adjusted |
| * if Node/Browsers become capabile of > 4GB frames. |
| * |
| * - FIN is 1 if the message is complete |
| * - RSV1/2/3 are always 0 |
| * - Opcode is 1(TEXT) for TJSONProtocol and 2(BIN) for TBinaryProtocol |
| * - Mask Present bit is 1 sending to-server and 0 sending to-client |
| * - Payload Len: |
| * + If < 126: then represented directly |
| * + If >=126: but within range of an unsigned 16 bit integer |
| * then Payload Len is 126 and the two following bytes store |
| * the length |
| * + Else: Payload Len is 127 and the following 8 bytes store the |
| * length as an unsigned 64 bit integer |
| * - Masking key is a 32 bit key only present when sending to the server |
| * - Payload follows the masking key or length |
| * |
| * 0 1 2 3 |
| * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 |
| * +-+-+-+-+-------+-+-------------+-------------------------------+ |
| * |F|R|R|R| opcode|M| Payload len | Extended payload length | |
| * |I|S|S|S| (4) |A| (7) | (16/64) | |
| * |N|V|V|V| |S| | (if payload len==126/127) | |
| * | |1|2|3| |K| | | |
| * +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + |
| * | Extended payload length continued, if payload len == 127 | |
| * + - - - - - - - - - - - - - - - +-------------------------------+ |
| * | |Masking-key, if MASK set to 1 | |
| * +-------------------------------+-------------------------------+ |
| * | Masking-key (continued) | Payload Data | |
| * +-------------------------------- - - - - - - - - - - - - - - - + |
| * : Payload Data continued ... : |
| * + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + |
| * | Payload Data continued ... | |
| * +---------------------------------------------------------------+ |
| */ |
| var wsFrame = { |
| /** Encodes a WebSocket frame |
| * |
| * @param {Buffer} data - The raw data to encode |
| * @param {Buffer} mask - The mask to apply when sending to server, null for no mask |
| * @param {Boolean} binEncoding - True for binary encoding, false for text encoding |
| * @returns {Buffer} - The WebSocket frame, ready to send |
| */ |
| encode: function(data, mask, binEncoding) { |
| var frame = new Buffer(wsFrame.frameSizeFromData(data, mask)); |
| //Byte 0 - FIN & OPCODE |
| frame[0] = wsFrame.fin.FIN + |
| (binEncoding ? wsFrame.frameOpCodes.BIN : wsFrame.frameOpCodes.TEXT); |
| //Byte 1 or 1-3 or 1-9 - MASK FLAG & SIZE |
| var payloadOffset = 2; |
| if (data.length < 0x7E) { |
| frame[1] = data.length + (mask ? wsFrame.mask.TO_SERVER : wsFrame.mask.TO_CLIENT); |
| } else if (data.length < 0xFFFF) { |
| frame[1] = 0x7E + (mask ? wsFrame.mask.TO_SERVER : wsFrame.mask.TO_CLIENT); |
| frame.writeUInt16BE(data.length, 2, true); |
| payloadOffset = 4; |
| } else { |
| frame[1] = 0x7F + (mask ? wsFrame.mask.TO_SERVER : wsFrame.mask.TO_CLIENT); |
| frame.writeUInt32BE(0, 2, true); |
| frame.writeUInt32BE(data.length, 6, true); |
| payloadOffset = 10; |
| } |
| //MASK |
| if (mask) { |
| mask.copy(frame, payloadOffset, 0, 4); |
| payloadOffset += 4; |
| } |
| //Payload |
| data.copy(frame, payloadOffset); |
| if (mask) { |
| wsFrame.applyMask(frame.slice(payloadOffset), frame.slice(payloadOffset-4,payloadOffset)); |
| } |
| return frame; |
| }, |
| |
| /** |
| * @class |
| * @name WSDecodeResult |
| * @property {Buffer} data - The decoded data for the first ATRPC message |
| * @property {Buffer} mask - The frame mask |
| * @property {Boolean} binEncoding - True if binary (TBinaryProtocol), |
| * False if text (TJSONProtocol) |
| * @property {Buffer} nextFrame - Multiple ATRPC messages may be sent in a |
| * single WebSocket frame, this Buffer contains |
| * any bytes remaining to be decoded |
| * @property {Boolean} FIN - True is the message is complete |
| */ |
| |
| /** Decodes a WebSocket frame |
| * |
| * @param {Buffer} frame - The raw inbound frame, if this is a continuation |
| * frame it must have a mask property with the mask. |
| * @returns {WSDecodeResult} - The decoded payload |
| * |
| * @see {@link WSDecodeResult} |
| */ |
| decode: function(frame) { |
| var result = { |
| data: null, |
| mask: null, |
| binEncoding: false, |
| nextFrame: null, |
| FIN: true |
| }; |
| |
| //Byte 0 - FIN & OPCODE |
| if (wsFrame.fin.FIN != (frame[0] & wsFrame.fin.FIN)) { |
| result.FIN = false; |
| } |
| result.binEncoding = (wsFrame.frameOpCodes.BIN == (frame[0] & wsFrame.frameOpCodes.BIN)); |
| //Byte 1 or 1-3 or 1-9 - SIZE |
| var lenByte = (frame[1] & 0x0000007F); |
| var len = lenByte; |
| var dataOffset = 2; |
| if (lenByte == 0x7E) { |
| len = frame.readUInt16BE(2); |
| dataOffset = 4; |
| } else if (lenByte == 0x7F) { |
| len = frame.readUInt32BE(6); |
| dataOffset = 10; |
| } |
| //MASK |
| if (wsFrame.mask.TO_SERVER == (frame[1] & wsFrame.mask.TO_SERVER)) { |
| result.mask = new Buffer(4); |
| frame.copy(result.mask, 0, dataOffset, dataOffset + 4); |
| dataOffset += 4; |
| } |
| //Payload |
| result.data = new Buffer(len); |
| frame.copy(result.data, 0, dataOffset, dataOffset+len); |
| if (result.mask) { |
| wsFrame.applyMask(result.data, result.mask); |
| } |
| //Next Frame |
| if (frame.length > dataOffset+len) { |
| result.nextFrame = new Buffer(frame.length - (dataOffset+len)); |
| frame.copy(result.nextFrame, 0, dataOffset+len, frame.length); |
| } |
| //Don't forward control frames |
| if (frame[0] & wsFrame.frameOpCodes.FINCTRL) { |
| result.data = null; |
| } |
| |
| return result; |
| }, |
| |
| /** Masks/Unmasks data |
| * |
| * @param {Buffer} data - data to mask/unmask in place |
| * @param {Buffer} mask - the mask |
| */ |
| applyMask: function(data, mask){ |
| //TODO: look into xoring words at a time |
| var dataLen = data.length; |
| var maskLen = mask.length; |
| for (var i = 0; i < dataLen; i++) { |
| data[i] = data[i] ^ mask[i%maskLen]; |
| } |
| }, |
| |
| /** Computes frame size on the wire from data to be sent |
| * |
| * @param {Buffer} data - data.length is the assumed payload size |
| * @param {Boolean} mask - true if a mask will be sent (TO_SERVER) |
| */ |
| frameSizeFromData: function(data, mask) { |
| var headerSize = 10; |
| if (data.length < 0x7E) { |
| headerSize = 2; |
| } else if (data.length < 0xFFFF) { |
| headerSize = 4; |
| } |
| return headerSize + data.length + (mask ? 4 : 0); |
| }, |
| |
| frameOpCodes: { |
| CONT: 0x00, |
| TEXT: 0x01, |
| BIN: 0x02, |
| CTRL: 0x80 |
| }, |
| |
| mask: { |
| TO_SERVER: 0x80, |
| TO_CLIENT: 0x00 |
| }, |
| |
| fin: { |
| CONT: 0x00, |
| FIN: 0x80 |
| } |
| }; |
| |
| |
| // createWebServer constructor and options |
| ///////////////////////////////////////////////////////////////////// |
| |
| /** |
| * @class |
| * @name ServerOptions |
| * @property {array} cors - Array of CORS origin strings to permit requests from. |
| * @property {string} files - Path to serve static files from, if absent or "" |
| * static file service is disabled. |
| * @property {object} headers - An object hash mapping header strings to header value |
| * strings, these headers are transmitted in response to |
| * static file GET operations. |
| * @property {object} services - An object hash mapping service URI strings |
| * to ServiceOptions objects |
| * @property {object} tls - Node.js TLS options (see: nodejs.org/api/tls.html), |
| * if not present or null regular http is used, |
| * at least a key and a cert must be defined to use SSL/TLS |
| * @see {@link ServiceOptions} |
| */ |
| |
| /** |
| * @class |
| * @name ServiceOptions |
| * @property {object} transport - The layered transport to use (defaults |
| * to TBufferedTransport). |
| * @property {object} protocol - The serialization Protocol to use (defaults to |
| * TBinaryProtocol). |
| * @property {object} processor - The Thrift Service class/processor generated |
| * by the IDL Compiler for the service (the "cls" |
| * key can also be used for this attribute). |
| * @property {object} handler - The handler methods for the Thrift Service. |
| */ |
| |
| /** |
| * Create a Thrift server which can serve static files and/or one or |
| * more Thrift Services. |
| * @param {ServerOptions} options - The server configuration. |
| * @returns {object} - The Apache Thrift Web Server. |
| */ |
| exports.createWebServer = function(options) { |
| var baseDir = options.files; |
| var contentTypesByExtension = { |
| '.txt': 'text/plain', |
| '.html': 'text/html', |
| '.css': 'text/css', |
| '.xml': 'application/xml', |
| '.json': 'application/json', |
| '.js': 'application/javascript', |
| '.jpg': 'image/jpeg', |
| '.jpeg': 'image/jpeg', |
| '.gif': 'image/gif', |
| '.png': 'image/png', |
| '.svg': 'image/svg+xml' |
| }; |
| |
| //Setup all of the services |
| var services = options.services; |
| for (var uri in services) { |
| var svcObj = services[uri]; |
| |
| //Setup the processor |
| if (svcObj.processor instanceof MultiplexedProcessor) { |
| //Multiplex processors have pre embedded processor/handler pairs, save as is |
| svcObj.processor = svcObj.processor; |
| } else { |
| //For historical reasons Node.js supports processors passed in directly or via the |
| // IDL Compiler generated class housing the processor. Also, the options property |
| // for a Processor has been called both cls and processor at different times. We |
| // support any of the four possibilities here. |
| var processor = (svcObj.processor) ? (svcObj.processor.Processor || svcObj.processor) : |
| (svcObj.cls.Processor || svcObj.cls); |
| //Processors can be supplied as constructed objects with handlers already embedded, |
| // if a handler is provided we construct a new processor, if not we use the processor |
| // object directly |
| if (svcObj.handler) { |
| svcObj.processor = new processor(svcObj.handler); |
| } else { |
| svcObj.processor = processor; |
| } |
| } |
| svcObj.transport = svcObj.transport ? svcObj.transport : TBufferedTransport; |
| svcObj.protocol = svcObj.protocol ? svcObj.protocol : TBinaryProtocol; |
| } |
| |
| //Verify CORS requirements |
| function VerifyCORSAndSetHeaders(request, response) { |
| if (request.headers.origin && options.cors) { |
| if (options.cors["*"] || options.cors[request.headers.origin]) { |
| //Allow, origin allowed |
| response.setHeader("access-control-allow-origin", request.headers.origin); |
| response.setHeader("access-control-allow-methods", "GET, POST, OPTIONS"); |
| response.setHeader("access-control-allow-headers", "content-type, accept"); |
| response.setHeader("access-control-max-age", "60"); |
| return true; |
| } else { |
| //Disallow, origin denied |
| return false; |
| } |
| } |
| //Allow, CORS is not in use |
| return true; |
| } |
| |
| |
| //Handle OPTIONS method (CORS) |
| /////////////////////////////////////////////////// |
| function processOptions(request, response) { |
| if (VerifyCORSAndSetHeaders(request, response)) { |
| response.writeHead("204", "No Content", {"content-length": 0}); |
| } else { |
| response.writeHead("403", "Origin " + request.headers.origin + " not allowed", {}); |
| } |
| response.end(); |
| } |
| |
| |
| //Handle POST methods (TXHRTransport) |
| /////////////////////////////////////////////////// |
| function processPost(request, response) { |
| //Lookup service |
| var uri = url.parse(request.url).pathname; |
| var svc = services[uri]; |
| if (!svc) { |
| response.writeHead("403", "No Apache Thrift Service at " + uri, {}); |
| response.end(); |
| return; |
| } |
| |
| //Verify CORS requirements |
| if (!VerifyCORSAndSetHeaders(request, response)) { |
| response.writeHead("403", "Origin " + request.headers.origin + " not allowed", {}); |
| response.end(); |
| return; |
| } |
| |
| //Process XHR payload |
| request.on('data', svc.transport.receiver(function(transportWithData) { |
| var input = new svc.protocol(transportWithData); |
| var output = new svc.protocol(new svc.transport(undefined, function(buf) { |
| try { |
| response.writeHead(200); |
| response.end(buf); |
| } catch (err) { |
| response.writeHead(500); |
| response.end(); |
| } |
| })); |
| |
| try { |
| svc.processor.process(input, output); |
| transportWithData.commitPosition(); |
| } catch (err) { |
| if (err instanceof InputBufferUnderrunError) { |
| transportWithData.rollbackPosition(); |
| } else { |
| response.writeHead(500); |
| response.end(); |
| } |
| } |
| })); |
| } |
| |
| |
| //Handle GET methods (Static Page Server) |
| /////////////////////////////////////////////////// |
| function processGet(request, response) { |
| //Undefined or empty base directory means do not serve static files |
| if (!baseDir || "" === baseDir) { |
| response.writeHead(404); |
| response.end(); |
| return; |
| } |
| |
| //Verify CORS requirements |
| if (!VerifyCORSAndSetHeaders(request, response)) { |
| response.writeHead("403", "Origin " + request.headers.origin + " not allowed", {}); |
| response.end(); |
| return; |
| } |
| |
| //Locate the file requested and send it |
| var uri = url.parse(request.url).pathname; |
| var filename = path.resolve(path.join(baseDir, uri)); |
| |
| //Ensure the basedir path is not able to be escaped |
| if (filename.indexOf(baseDir) != 0) { |
| response.writeHead(400, "Invalid request path", {}); |
| response.end(); |
| return; |
| } |
| |
| fs.exists(filename, function(exists) { |
| if(!exists) { |
| response.writeHead(404); |
| response.end(); |
| return; |
| } |
| |
| if (fs.statSync(filename).isDirectory()) { |
| filename += '/index.html'; |
| } |
| |
| fs.readFile(filename, "binary", function(err, file) { |
| if (err) { |
| response.writeHead(500); |
| response.end(err + "\n"); |
| return; |
| } |
| var headers = {}; |
| var contentType = contentTypesByExtension[path.extname(filename)]; |
| if (contentType) { |
| headers["Content-Type"] = contentType; |
| } |
| for (var k in options.headers) { |
| headers[k] = options.headers[k]; |
| } |
| response.writeHead(200, headers); |
| response.write(file, "binary"); |
| response.end(); |
| }); |
| }); |
| } |
| |
| |
| //Handle WebSocket calls (TWebSocketTransport) |
| /////////////////////////////////////////////////// |
| function processWS(data, socket, svc, binEncoding) { |
| svc.transport.receiver(function(transportWithData) { |
| var input = new svc.protocol(transportWithData); |
| var output = new svc.protocol(new svc.transport(undefined, function(buf) { |
| try { |
| var frame = wsFrame.encode(buf, null, binEncoding); |
| socket.write(frame); |
| } catch (err) { |
| //TODO: Add better error processing |
| } |
| })); |
| |
| try { |
| svc.processor.process(input, output); |
| transportWithData.commitPosition(); |
| } |
| catch (err) { |
| if (err instanceof InputBufferUnderrunError) { |
| transportWithData.rollbackPosition(); |
| } |
| else { |
| //TODO: Add better error processing |
| } |
| } |
| })(data); |
| } |
| |
| //Create the server (HTTP or HTTPS) |
| var server = null; |
| if (options.tls) { |
| server = https.createServer(options.tls); |
| } else { |
| server = http.createServer(); |
| } |
| |
| //Wire up listeners for upgrade(to WebSocket) & request methods for: |
| // - GET static files, |
| // - POST XHR Thrift services |
| // - OPTIONS CORS requests |
| server.on('request', function(request, response) { |
| if (request.method === 'POST') { |
| processPost(request, response); |
| } else if (request.method === 'GET') { |
| processGet(request, response); |
| } else if (request.method === 'OPTIONS') { |
| processOptions(request, response); |
| } else { |
| response.writeHead(500); |
| response.end(); |
| } |
| }).on('upgrade', function(request, socket, head) { |
| //Lookup service |
| var svc; |
| try { |
| svc = services[Object.keys(services)[0]]; |
| } catch(e) { |
| socket.write("HTTP/1.1 403 No Apache Thrift Service available\r\n\r\n"); |
| return; |
| } |
| //Perform upgrade |
| var hash = crypto.createHash("sha1"); |
| hash.update(request.headers['sec-websocket-key'] + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"); |
| socket.write("HTTP/1.1 101 Switching Protocols\r\n" + |
| "Upgrade: websocket\r\n" + |
| "Connection: Upgrade\r\n" + |
| "Sec-WebSocket-Accept: " + hash.digest("base64") + "\r\n" + |
| "Sec-WebSocket-Origin: " + request.headers.origin + "\r\n" + |
| "Sec-WebSocket-Location: ws://" + request.headers.host + request.url + "\r\n" + |
| "\r\n"); |
| //Handle WebSocket traffic |
| var data = null; |
| socket.on('data', function(frame) { |
| try { |
| while (frame) { |
| var result = wsFrame.decode(frame); |
| //Prepend any existing decoded data |
| if (data) { |
| if (result.data) { |
| var newData = new Buffer(data.length + result.data.length); |
| data.copy(newData); |
| result.data.copy(newData, data.length); |
| result.data = newData; |
| } else { |
| result.data = data; |
| } |
| data = null; |
| } |
| //If this completes a message process it |
| if (result.FIN) { |
| processWS(result.data, socket, svc, result.binEncoding); |
| } else { |
| data = result.data; |
| } |
| //Prepare next frame for decoding (if any) |
| frame = result.nextFrame; |
| } |
| } catch(e) { |
| log.error('TWebSocketTransport Exception: ' + e); |
| socket.destroy(); |
| } |
| }); |
| }); |
| |
| //Return the server |
| return server; |
| }; |