| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| var util = require('util'); |
| var EventEmitter = require('events').EventEmitter; |
| var constants = require('constants'); |
| var net = require('net'); |
| var tls = require('tls'); |
| var thrift = require('./thrift'); |
| var log = require('./log'); |
| |
| var TBufferedTransport = require('./buffered_transport'); |
| var TBinaryProtocol = require('./binary_protocol'); |
| var InputBufferUnderrunError = require('./input_buffer_underrun_error'); |
| |
| var createClient = require('./create_client'); |
| |
| var binary = require('./binary'); |
| |
| var Connection = exports.Connection = function(stream, options) { |
| var self = this; |
| EventEmitter.call(this); |
| |
| this.seqId2Service = {}; |
| this.connection = stream; |
| this.ssl = (stream.encrypted); |
| this.options = options || {}; |
| this.transport = this.options.transport || TBufferedTransport; |
| this.protocol = this.options.protocol || TBinaryProtocol; |
| this.offline_queue = []; |
| this.connected = false; |
| this.initialize_retry_vars(); |
| |
| this._debug = this.options.debug || false; |
| if (this.options.max_attempts && |
| !isNaN(this.options.max_attempts) && |
| this.options.max_attempts > 0) { |
| this.max_attempts = +this.options.max_attempts; |
| } |
| this.retry_max_delay = null; |
| if (this.options.retry_max_delay !== undefined && |
| !isNaN(this.options.retry_max_delay) && |
| this.options.retry_max_delay > 0) { |
| this.retry_max_delay = this.options.retry_max_delay; |
| } |
| this.connect_timeout = false; |
| if (this.options.connect_timeout && |
| !isNaN(this.options.connect_timeout) && |
| this.options.connect_timeout > 0) { |
| this.connect_timeout = +this.options.connect_timeout; |
| } |
| |
| this.connection.addListener(this.ssl ? "secureConnect" : "connect", function() { |
| self.connected = true; |
| |
| this.setTimeout(self.options.timeout || 0); |
| this.setNoDelay(); |
| this.frameLeft = 0; |
| this.framePos = 0; |
| this.frame = null; |
| self.initialize_retry_vars(); |
| self.flush_offline_queue(); |
| |
| self.emit("connect"); |
| }); |
| |
| this.connection.addListener("error", function(err) { |
| // Only emit the error if no-one else is listening on the connection |
| // or if someone is listening on us, because Node turns unhandled |
| // 'error' events into exceptions. |
| if (self.connection.listeners('error').length === 1 || |
| self.listeners('error').length > 0) { |
| self.emit("error", err); |
| } |
| }); |
| |
| // Add a close listener |
| this.connection.addListener("close", function() { |
| self.connection_gone(); // handle close event. try to reconnect |
| }); |
| |
| this.connection.addListener("timeout", function() { |
| self.emit("timeout"); |
| }); |
| |
| this.connection.addListener("data", self.transport.receiver(function(transport_with_data) { |
| var message = new self.protocol(transport_with_data); |
| try { |
| while (true) { |
| var header = message.readMessageBegin(); |
| var dummy_seqid = header.rseqid * -1; |
| var client = self.client; |
| //The Multiplexed Protocol stores a hash of seqid to service names |
| // in seqId2Service. If the SeqId is found in the hash we need to |
| // lookup the appropriate client for this call. |
| // The connection.client object is a single client object when not |
| // multiplexing, when using multiplexing it is a service name keyed |
| // hash of client objects. |
| //NOTE: The 2 way interdependencies between protocols, transports, |
| // connections and clients in the Node.js implementation are irregular |
| // and make the implementation difficult to extend and maintain. We |
| // should bring this stuff inline with typical thrift I/O stack |
| // operation soon. |
| // --ra |
| var service_name = self.seqId2Service[header.rseqid]; |
| if (service_name) { |
| client = self.client[service_name]; |
| } |
| /*jshint -W083 */ |
| client._reqs[dummy_seqid] = function(err, success){ |
| transport_with_data.commitPosition(); |
| |
| var callback = client._reqs[header.rseqid]; |
| delete client._reqs[header.rseqid]; |
| if (service_name) { |
| delete self.seqId2Service[header.rseqid]; |
| } |
| if (callback) { |
| callback(err, success); |
| } |
| }; |
| /*jshint +W083 */ |
| |
| if(client['recv_' + header.fname]) { |
| client['recv_' + header.fname](message, header.mtype, dummy_seqid); |
| } else { |
| delete client._reqs[dummy_seqid]; |
| self.emit("error", |
| new thrift.TApplicationException(thrift.TApplicationExceptionType.WRONG_METHOD_NAME, |
| "Received a response to an unknown RPC function")); |
| } |
| } |
| } |
| catch (e) { |
| if (e instanceof InputBufferUnderrunError) { |
| transport_with_data.rollbackPosition(); |
| } |
| else { |
| self.emit('error', e); |
| } |
| } |
| })); |
| }; |
| util.inherits(Connection, EventEmitter); |
| |
| Connection.prototype.end = function() { |
| this.connection.end(); |
| }; |
| |
| Connection.prototype.destroy = function() { |
| this.connection.destroy(); |
| }; |
| |
| Connection.prototype.initialize_retry_vars = function () { |
| this.retry_timer = null; |
| this.retry_totaltime = 0; |
| this.retry_delay = 150; |
| this.retry_backoff = 1.7; |
| this.attempts = 0; |
| }; |
| |
| Connection.prototype.flush_offline_queue = function () { |
| var self = this; |
| var offline_queue = this.offline_queue; |
| |
| // Reset offline queue |
| this.offline_queue = []; |
| // Attempt to write queued items |
| offline_queue.forEach(function(data) { |
| self.write(data); |
| }); |
| }; |
| |
| Connection.prototype.write = function(data) { |
| if (!this.connected) { |
| this.offline_queue.push(data); |
| return; |
| } |
| this.connection.write(data); |
| }; |
| |
| Connection.prototype.connection_gone = function () { |
| var self = this; |
| this.connected = false; |
| |
| // If a retry is already in progress, just let that happen |
| if (this.retry_timer) { |
| return; |
| } |
| // We cannot reconnect a secure socket. |
| if (!this.max_attempts || this.ssl) { |
| self.emit("close"); |
| return; |
| } |
| |
| if (this.retry_max_delay !== null && this.retry_delay >= this.retry_max_delay) { |
| this.retry_delay = this.retry_max_delay; |
| } else { |
| this.retry_delay = Math.floor(this.retry_delay * this.retry_backoff); |
| } |
| |
| log.debug("Retry connection in " + this.retry_delay + " ms"); |
| |
| if (this.max_attempts && this.attempts >= this.max_attempts) { |
| this.retry_timer = null; |
| console.error("thrift: Couldn't get thrift connection after " + this.max_attempts + " attempts."); |
| self.emit("close"); |
| return; |
| } |
| |
| this.attempts += 1; |
| this.emit("reconnecting", { |
| delay: self.retry_delay, |
| attempt: self.attempts |
| }); |
| |
| this.retry_timer = setTimeout(function () { |
| if (self.connection.destroyed) { |
| self.retry_timer = null; |
| return; |
| } |
| |
| log.debug("Retrying connection..."); |
| |
| self.retry_totaltime += self.retry_delay; |
| |
| if (self.connect_timeout && self.retry_totaltime >= self.connect_timeout) { |
| self.retry_timer = null; |
| console.error("thrift: Couldn't get thrift connection after " + self.retry_totaltime + "ms."); |
| self.emit("close"); |
| return; |
| } |
| |
| if (self.path !== undefined) { |
| self.connection.connect(self.path); |
| } else { |
| self.connection.connect(self.port, self.host); |
| } |
| self.retry_timer = null; |
| }, this.retry_delay); |
| }; |
| |
| exports.createConnection = function(host, port, options) { |
| var stream = net.createConnection( { |
| port: port, |
| host: host, |
| timeout: options.connect_timeout || options.timeout || 0 |
| }); |
| var connection = new Connection(stream, options); |
| connection.host = host; |
| connection.port = port; |
| |
| return connection; |
| }; |
| |
| exports.createUDSConnection = function(path, options) { |
| var stream = net.createConnection(path); |
| var connection = new Connection(stream, options); |
| connection.path = path; |
| |
| return connection; |
| }; |
| |
| exports.createSSLConnection = function(host, port, options) { |
| if (!('secureProtocol' in options) && !('secureOptions' in options)) { |
| options.secureProtocol = "SSLv23_method"; |
| options.secureOptions = constants.SSL_OP_NO_SSLv2 | constants.SSL_OP_NO_SSLv3; |
| } |
| |
| var stream = tls.connect(port, host, options); |
| var connection = new Connection(stream, options); |
| connection.host = host; |
| connection.port = port; |
| |
| return connection; |
| }; |
| |
| |
| exports.createClient = createClient; |
| |
| var child_process = require('child_process'); |
| var StdIOConnection = exports.StdIOConnection = function(command, options) { |
| var command_parts = command.split(' '); |
| command = command_parts[0]; |
| var args = command_parts.splice(1,command_parts.length -1); |
| var child = this.child = child_process.spawn(command,args); |
| |
| var self = this; |
| EventEmitter.call(this); |
| |
| this.connection = child.stdin; |
| this.options = options || {}; |
| this.transport = this.options.transport || TBufferedTransport; |
| this.protocol = this.options.protocol || TBinaryProtocol; |
| this.offline_queue = []; |
| |
| if (log.getLogLevel() === 'debug') { |
| this.child.stderr.on('data', function (err) { |
| log.debug(err.toString(), 'CHILD ERROR'); |
| }); |
| |
| this.child.on('exit', function (code,signal) { |
| log.debug(code + ':' + signal, 'CHILD EXITED'); |
| }); |
| } |
| |
| this.frameLeft = 0; |
| this.framePos = 0; |
| this.frame = null; |
| this.connected = true; |
| |
| self.flush_offline_queue(); |
| |
| this.connection.addListener("error", function(err) { |
| self.emit("error", err); |
| }); |
| |
| // Add a close listener |
| this.connection.addListener("close", function() { |
| self.emit("close"); |
| }); |
| |
| child.stdout.addListener("data", self.transport.receiver(function(transport_with_data) { |
| var message = new self.protocol(transport_with_data); |
| try { |
| var header = message.readMessageBegin(); |
| var dummy_seqid = header.rseqid * -1; |
| var client = self.client; |
| client._reqs[dummy_seqid] = function(err, success){ |
| transport_with_data.commitPosition(); |
| |
| var callback = client._reqs[header.rseqid]; |
| delete client._reqs[header.rseqid]; |
| if (callback) { |
| callback(err, success); |
| } |
| }; |
| client['recv_' + header.fname](message, header.mtype, dummy_seqid); |
| } |
| catch (e) { |
| if (e instanceof InputBufferUnderrunError) { |
| transport_with_data.rollbackPosition(); |
| } |
| else { |
| throw e; |
| } |
| } |
| })); |
| }; |
| |
| util.inherits(StdIOConnection, EventEmitter); |
| |
| StdIOConnection.prototype.end = function() { |
| this.connection.end(); |
| }; |
| |
| StdIOConnection.prototype.flush_offline_queue = function () { |
| var self = this; |
| var offline_queue = this.offline_queue; |
| |
| // Reset offline queue |
| this.offline_queue = []; |
| // Attempt to write queued items |
| offline_queue.forEach(function(data) { |
| self.write(data); |
| }); |
| }; |
| |
| StdIOConnection.prototype.write = function(data) { |
| if (!this.connected) { |
| this.offline_queue.push(data); |
| return; |
| } |
| this.connection.write(data); |
| }; |
| |
| exports.createStdIOConnection = function(command,options){ |
| return new StdIOConnection(command,options); |
| }; |
| |
| exports.createStdIOClient = createClient; |